You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/02/25 01:13:48 UTC

[1/2] apex-core git commit: APEXCORE-624 decrement unallocated containers and released containers so exit condition for shutdown check is satisfied.

Repository: apex-core
Updated Branches:
  refs/heads/master 911ccb269 -> a6dd73b96


APEXCORE-624 decrement unallocated containers and released containers so exit condition for shutdown check is satisfied.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/de4c11fe
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/de4c11fe
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/de4c11fe

Branch: refs/heads/master
Commit: de4c11fee820dbaec23c3217bf772af982e83baf
Parents: d80501b
Author: Sanjay Pujare <sa...@Sanjay-DT-Mac2.local>
Authored: Sat Feb 18 12:33:31 2017 -0800
Committer: Sanjay Pujare <sa...@Sanjay-DT-Mac2.local>
Committed: Thu Feb 23 21:41:17 2017 -0800

----------------------------------------------------------------------
 .../stram/ResourceRequestHandler.java           |  1 +
 .../stram/StreamingAppMasterService.java        | 26 +++++++++-----------
 2 files changed, 13 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/de4c11fe/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
index c56f64f..e7f9672 100644
--- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
+++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
@@ -81,6 +81,7 @@ public class ResourceRequestHandler
          */
         if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) {
           StreamingContainerAgent.ContainerStartRequest csr = entry.getKey();
+          LOG.debug("Request for container {} timed out. Re-requesting container", csr.container);
           removedContainerRequests.add(entry.getValue().getRight());
           ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false);
           entry.getValue().setLeft(loopCounter);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/de4c11fe/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 15b6402..3898dbc 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -705,7 +705,7 @@ public class StreamingAppMasterService extends CompositeService
     int loopCounter = -1;
     long nodeReportUpdateTime = 0;
     List<ContainerId> releasedContainers = new ArrayList<>();
-    int numTotalContainers = 0;
+
     // keep track of already requested containers to not request them again while waiting for allocation
     int numRequestedContainers = 0;
     int numReleasedContainers = 0;
@@ -729,7 +729,7 @@ public class StreamingAppMasterService extends CompositeService
         dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.",
             ar.getApplicationId().toString(), ar.getName(), ar.getUser());
         LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
-        finishApplication(FinalApplicationStatus.FAILED, numTotalContainers);
+        finishApplication(FinalApplicationStatus.FAILED);
         return;
       }
       resourceRequestor.updateNodeReports(clientRMService.getNodeReports());
@@ -829,7 +829,7 @@ public class StreamingAppMasterService extends CompositeService
 
       resourceRequestor.reissueContainerRequests(amRmClient, requestedResources, loopCounter, resourceRequestor, containerRequests, removedContainerRequests);
 
-     /* Remove nodes from blacklist after timeout */
+      /* Remove nodes from blacklist after timeout */
       List<String> blacklistRemovals = new ArrayList<>();
       for (String hostname : failedBlackListedNodes) {
         Long timeDiff = currentTimeMillis - failedContainerNodesMap.get(hostname).blackListAdditionTime;
@@ -844,8 +844,7 @@ public class StreamingAppMasterService extends CompositeService
         failedBlackListedNodes.removeAll(blacklistRemovals);
       }
 
-      numTotalContainers += containerRequests.size();
-      numRequestedContainers += containerRequests.size();
+      numRequestedContainers += containerRequests.size() - removedContainerRequests.size();
       AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers);
       if (amResp.getAMCommand() != null) {
         LOG.info(" statement executed:{}", amResp.getAMCommand());
@@ -884,7 +883,7 @@ public class StreamingAppMasterService extends CompositeService
           LOG.info("Releasing {} as resource with priority {} was already assigned", allocatedContainer.getId(), allocatedContainer.getPriority());
           releasedContainers.add(allocatedContainer.getId());
           numReleasedContainers++;
-          numRequestedContainers++;
+          numRequestedContainers--;
           continue;
         }
         if (csr != null) {
@@ -1025,23 +1024,24 @@ public class StreamingAppMasterService extends CompositeService
         appDone = true;
       }
 
-      LOG.debug("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total=" + numTotalContainers + ", requested=" + numRequestedContainers + ", released=" + numReleasedContainers + ", completed=" + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated=" + allocatedContainers.size());
+      LOG.debug("Current application state: loop={}, appDone={}, requested={}, released={}, completed={}, failed={}, currentAllocated={}, dnmgr.containerStartRequests={}",
+          loopCounter, appDone, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests);
 
       // monitor child containers
       dnmgr.monitorHeartbeat();
     }
 
-    finishApplication(finalStatus, numTotalContainers);
+    finishApplication(finalStatus);
   }
 
-  private void finishApplication(FinalApplicationStatus finalStatus, int numTotalContainers) throws YarnException, IOException
+  private void finishApplication(FinalApplicationStatus finalStatus) throws YarnException, IOException
   {
     LOG.info("Application completed. Signalling finish to RM");
     FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
     finishReq.setFinalApplicationStatus(finalStatus);
 
     if (finalStatus != FinalApplicationStatus.SUCCEEDED) {
-      String diagnostics = "Diagnostics." + ", total=" + numTotalContainers + ", completed=" + numCompletedContainers.get() + ", allocated=" + allocatedContainers.size() + ", failed=" + numFailedContainers.get();
+      String diagnostics = "Diagnostics." + " completed=" + numCompletedContainers.get() + ", allocated=" + allocatedContainers.size() + ", failed=" + numFailedContainers.get();
       if (!StringUtils.isEmpty(dnmgr.shutdownDiagnosticsMessage)) {
         diagnostics += "\n";
         diagnostics += dnmgr.shutdownDiagnosticsMessage;
@@ -1099,16 +1099,14 @@ public class StreamingAppMasterService extends CompositeService
   private AllocateResponse sendContainerAskToRM(List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests, List<ContainerId> releasedContainers) throws YarnException, IOException
   {
     if (removedContainerRequests.size() > 0) {
-      LOG.info(" Removing container request: " + removedContainerRequests);
+      LOG.debug("Removing container request: {}", removedContainerRequests);
       for (ContainerRequest cr : removedContainerRequests) {
-        LOG.info("Removed container: {}", cr.toString());
         amRmClient.removeContainerRequest(cr);
       }
     }
     if (containerRequests.size() > 0) {
-      LOG.info("Asking RM for containers: " + containerRequests);
+      LOG.debug("Asking RM for containers: {}", containerRequests);
       for (ContainerRequest cr : containerRequests) {
-        LOG.info("Requested container: {} on host: [{}]", cr.toString(), StringUtils.join(cr.getNodes(), ", "));
         amRmClient.addContainerRequest(cr);
       }
     }


[2/2] apex-core git commit: Merge branch 'APEXCORE-624.master.sanjay' of http://github.com/sanjaypujare/apex-core into APEXCORE-624

Posted by vr...@apache.org.
Merge branch 'APEXCORE-624.master.sanjay' of http://github.com/sanjaypujare/apex-core into APEXCORE-624


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a6dd73b9
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a6dd73b9
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a6dd73b9

Branch: refs/heads/master
Commit: a6dd73b96b78f5c2509c025fca9fcc96e917f0c1
Parents: 911ccb2 de4c11f
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Fri Feb 24 17:10:12 2017 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Fri Feb 24 17:10:12 2017 -0800

----------------------------------------------------------------------
 .../stram/ResourceRequestHandler.java           |  1 +
 .../stram/StreamingAppMasterService.java        | 26 +++++++++-----------
 2 files changed, 13 insertions(+), 14 deletions(-)
----------------------------------------------------------------------