You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/02/25 01:35:33 UTC
apex-core git commit: APEXCORE-624 decrement unallocated containers
and released containers so exit condition for shutdown check is satisfied.
Repository: apex-core
Updated Branches:
refs/heads/release-3.5 66bf590c8 -> bd8f7bade
APEXCORE-624 decrement unallocated containers and released containers so exit condition for shutdown check is satisfied.
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/bd8f7bad
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/bd8f7bad
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/bd8f7bad
Branch: refs/heads/release-3.5
Commit: bd8f7bade65f03e7c7729da383a29cd424664f91
Parents: 66bf590
Author: Sanjay Pujare <sa...@sanjay-dt-mac2.local>
Authored: Sat Feb 18 12:33:31 2017 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Fri Feb 24 17:15:58 2017 -0800
----------------------------------------------------------------------
.../stram/ResourceRequestHandler.java | 1 +
.../stram/StreamingAppMasterService.java | 26 +++++++++-----------
2 files changed, 13 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/bd8f7bad/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
index c56f64f..e7f9672 100644
--- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
+++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
@@ -81,6 +81,7 @@ public class ResourceRequestHandler
*/
if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) {
StreamingContainerAgent.ContainerStartRequest csr = entry.getKey();
+ LOG.debug("Request for container {} timed out. Re-requesting container", csr.container);
removedContainerRequests.add(entry.getValue().getRight());
ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false);
entry.getValue().setLeft(loopCounter);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/bd8f7bad/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 15b6402..3898dbc 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -705,7 +705,7 @@ public class StreamingAppMasterService extends CompositeService
int loopCounter = -1;
long nodeReportUpdateTime = 0;
List<ContainerId> releasedContainers = new ArrayList<>();
- int numTotalContainers = 0;
+
// keep track of already requested containers to not request them again while waiting for allocation
int numRequestedContainers = 0;
int numReleasedContainers = 0;
@@ -729,7 +729,7 @@ public class StreamingAppMasterService extends CompositeService
dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.",
ar.getApplicationId().toString(), ar.getName(), ar.getUser());
LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
- finishApplication(FinalApplicationStatus.FAILED, numTotalContainers);
+ finishApplication(FinalApplicationStatus.FAILED);
return;
}
resourceRequestor.updateNodeReports(clientRMService.getNodeReports());
@@ -829,7 +829,7 @@ public class StreamingAppMasterService extends CompositeService
resourceRequestor.reissueContainerRequests(amRmClient, requestedResources, loopCounter, resourceRequestor, containerRequests, removedContainerRequests);
- /* Remove nodes from blacklist after timeout */
+ /* Remove nodes from blacklist after timeout */
List<String> blacklistRemovals = new ArrayList<>();
for (String hostname : failedBlackListedNodes) {
Long timeDiff = currentTimeMillis - failedContainerNodesMap.get(hostname).blackListAdditionTime;
@@ -844,8 +844,7 @@ public class StreamingAppMasterService extends CompositeService
failedBlackListedNodes.removeAll(blacklistRemovals);
}
- numTotalContainers += containerRequests.size();
- numRequestedContainers += containerRequests.size();
+ numRequestedContainers += containerRequests.size() - removedContainerRequests.size();
AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers);
if (amResp.getAMCommand() != null) {
LOG.info(" statement executed:{}", amResp.getAMCommand());
@@ -884,7 +883,7 @@ public class StreamingAppMasterService extends CompositeService
LOG.info("Releasing {} as resource with priority {} was already assigned", allocatedContainer.getId(), allocatedContainer.getPriority());
releasedContainers.add(allocatedContainer.getId());
numReleasedContainers++;
- numRequestedContainers++;
+ numRequestedContainers--;
continue;
}
if (csr != null) {
@@ -1025,23 +1024,24 @@ public class StreamingAppMasterService extends CompositeService
appDone = true;
}
- LOG.debug("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total=" + numTotalContainers + ", requested=" + numRequestedContainers + ", released=" + numReleasedContainers + ", completed=" + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated=" + allocatedContainers.size());
+ LOG.debug("Current application state: loop={}, appDone={}, requested={}, released={}, completed={}, failed={}, currentAllocated={}, dnmgr.containerStartRequests={}",
+ loopCounter, appDone, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests);
// monitor child containers
dnmgr.monitorHeartbeat();
}
- finishApplication(finalStatus, numTotalContainers);
+ finishApplication(finalStatus);
}
- private void finishApplication(FinalApplicationStatus finalStatus, int numTotalContainers) throws YarnException, IOException
+ private void finishApplication(FinalApplicationStatus finalStatus) throws YarnException, IOException
{
LOG.info("Application completed. Signalling finish to RM");
FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setFinalApplicationStatus(finalStatus);
if (finalStatus != FinalApplicationStatus.SUCCEEDED) {
- String diagnostics = "Diagnostics." + ", total=" + numTotalContainers + ", completed=" + numCompletedContainers.get() + ", allocated=" + allocatedContainers.size() + ", failed=" + numFailedContainers.get();
+ String diagnostics = "Diagnostics." + " completed=" + numCompletedContainers.get() + ", allocated=" + allocatedContainers.size() + ", failed=" + numFailedContainers.get();
if (!StringUtils.isEmpty(dnmgr.shutdownDiagnosticsMessage)) {
diagnostics += "\n";
diagnostics += dnmgr.shutdownDiagnosticsMessage;
@@ -1099,16 +1099,14 @@ public class StreamingAppMasterService extends CompositeService
private AllocateResponse sendContainerAskToRM(List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests, List<ContainerId> releasedContainers) throws YarnException, IOException
{
if (removedContainerRequests.size() > 0) {
- LOG.info(" Removing container request: " + removedContainerRequests);
+ LOG.debug("Removing container request: {}", removedContainerRequests);
for (ContainerRequest cr : removedContainerRequests) {
- LOG.info("Removed container: {}", cr.toString());
amRmClient.removeContainerRequest(cr);
}
}
if (containerRequests.size() > 0) {
- LOG.info("Asking RM for containers: " + containerRequests);
+ LOG.debug("Asking RM for containers: {}", containerRequests);
for (ContainerRequest cr : containerRequests) {
- LOG.info("Requested container: {} on host: [{}]", cr.toString(), StringUtils.join(cr.getNodes(), ", "));
amRmClient.addContainerRequest(cr);
}
}