You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sa...@apache.org on 2017/08/15 22:09:36 UTC

[apex-core] branch master updated: APEXCORE-743 If the node manager doesn't kill the container in the customizable time window, it is forcefully recovered (#543)

This is an automated email from the ASF dual-hosted git repository.

sandesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 06a8737  APEXCORE-743 If the node manager doesn't kill the container in the customizable time window, it is forcefully recovered (#543)
06a8737 is described below

commit 06a8737cc4a15364ff636e5c5781ba3eefcfad16
Author: Sandesh Hegde <sa...@gmail.com>
AuthorDate: Tue Aug 15 15:09:34 2017 -0700

    APEXCORE-743 If the node manager doesn't kill the container in the customizable time window, it is forcefully recovered (#543)
---
 .../stram/StreamingAppMasterService.java           | 44 +++++++++++++++++-----
 1 file changed, 34 insertions(+), 10 deletions(-)

diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 61a31bd..6c640ee 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -42,6 +42,7 @@ import javax.xml.bind.annotation.XmlElement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.common.util.PropertiesHelper;
 import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
 import org.apache.apex.engine.api.plugin.PluginLocator;
 import org.apache.apex.engine.events.grouping.GroupingManager;
@@ -164,6 +165,7 @@ public class StreamingAppMasterService extends CompositeService
   private AppDataPushAgent appDataPushAgent;
   private ApexPluginDispatcher apexPluginDispatcher;
   private final GroupingManager groupingManager = GroupingManager.getGroupingManagerInstance();
+  private static final long REMOVE_CONTAINER_TIMEOUT = PropertiesHelper.getLong("org.apache.apex.nodemanager.containerKill.timeout", 30 * 1000, 0, Long.MAX_VALUE);
 
   public StreamingAppMasterService(ApplicationAttemptId appAttemptID)
   {
@@ -1178,14 +1180,18 @@ public class StreamingAppMasterService extends CompositeService
 
     for (String containerIdStr : dnmgr.containerStopRequests.values()) {
       AllocatedContainer allocatedContainer = this.allocatedContainers.get(containerIdStr);
-      if (allocatedContainer != null && !allocatedContainer.stopRequested) {
-        nmClient.stopContainerAsync(allocatedContainer.container.getId(), allocatedContainer.container.getNodeId());
-        LOG.info("Requested stop container {}", containerIdStr);
-        allocatedContainer.stopRequested = true;
+      if (allocatedContainer != null) {
+        allocatedContainer.stop(nmClient);
       }
       dnmgr.containerStopRequests.remove(containerIdStr);
     }
 
+    for (Map.Entry<String, AllocatedContainer> entry : allocatedContainers.entrySet()) {
+      if (entry.getValue().checkStopRequestedTimeout()) {
+        LOG.info("Timeout happened for NodeManager kill container request, recovering the container {} without waiting", entry.getKey());
+        recoverHelper(entry.getKey());
+      }
+    }
     return amRmClient.allocate(0);
   }
 
@@ -1246,26 +1252,44 @@ public class StreamingAppMasterService extends CompositeService
         @Override
         public void run()
         {
-          dnmgr.scheduleContainerRestart(containerId.toString());
-          allocatedContainers.remove(containerId.toString());
+          recoverHelper(containerId.toString());
         }
-
       });
     }
 
   }
 
-  private class AllocatedContainer
+  private void recoverHelper(final String containerId)
+  {
+    dnmgr.scheduleContainerRestart(containerId);
+    allocatedContainers.remove(containerId);
+  }
+
+  private static class AllocatedContainer
   {
     private final Container container;
-    private boolean stopRequested;
+    private long stop = Long.MAX_VALUE;
+
     private Token<StramDelegationTokenIdentifier> delegationToken;
 
     private AllocatedContainer(Container c)
     {
       container = c;
     }
-  }
 
+    public void stop(NMClientAsync nmClient)
+    {
+      if (stop == Long.MAX_VALUE) {
+        nmClient.stopContainerAsync(container.getId(), container.getNodeId());
+        LOG.info("Requested stop container {}", container.getId().toString());
+        stop = System.currentTimeMillis();
+      }
+    }
+
+    public boolean checkStopRequestedTimeout()
+    {
+      return System.currentTimeMillis() - stop > REMOVE_CONTAINER_TIMEOUT;
+    }
+  }
 }
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].