You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sa...@apache.org on 2017/08/15 22:09:36 UTC
[apex-core] branch master updated: APEXCORE-743 If the node manager
doesn't kill the container in the customizable time window,
it is forcefully recovered (#543)
This is an automated email from the ASF dual-hosted git repository.
sandesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git
The following commit(s) were added to refs/heads/master by this push:
new 06a8737 APEXCORE-743 If the node manager doesn't kill the container in the customizable time window, it is forcefully recovered (#543)
06a8737 is described below
commit 06a8737cc4a15364ff636e5c5781ba3eefcfad16
Author: Sandesh Hegde <sa...@gmail.com>
AuthorDate: Tue Aug 15 15:09:34 2017 -0700
APEXCORE-743 If the node manager doesn't kill the container in the customizable time window, it is forcefully recovered (#543)
---
.../stram/StreamingAppMasterService.java | 44 +++++++++++++++++-----
1 file changed, 34 insertions(+), 10 deletions(-)
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 61a31bd..6c640ee 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -42,6 +42,7 @@ import javax.xml.bind.annotation.XmlElement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.common.util.PropertiesHelper;
import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
import org.apache.apex.engine.api.plugin.PluginLocator;
import org.apache.apex.engine.events.grouping.GroupingManager;
@@ -164,6 +165,7 @@ public class StreamingAppMasterService extends CompositeService
private AppDataPushAgent appDataPushAgent;
private ApexPluginDispatcher apexPluginDispatcher;
private final GroupingManager groupingManager = GroupingManager.getGroupingManagerInstance();
+ private static final long REMOVE_CONTAINER_TIMEOUT = PropertiesHelper.getLong("org.apache.apex.nodemanager.containerKill.timeout", 30 * 1000, 0, Long.MAX_VALUE);
public StreamingAppMasterService(ApplicationAttemptId appAttemptID)
{
@@ -1178,14 +1180,18 @@ public class StreamingAppMasterService extends CompositeService
for (String containerIdStr : dnmgr.containerStopRequests.values()) {
AllocatedContainer allocatedContainer = this.allocatedContainers.get(containerIdStr);
- if (allocatedContainer != null && !allocatedContainer.stopRequested) {
- nmClient.stopContainerAsync(allocatedContainer.container.getId(), allocatedContainer.container.getNodeId());
- LOG.info("Requested stop container {}", containerIdStr);
- allocatedContainer.stopRequested = true;
+ if (allocatedContainer != null) {
+ allocatedContainer.stop(nmClient);
}
dnmgr.containerStopRequests.remove(containerIdStr);
}
+ for (Map.Entry<String, AllocatedContainer> entry : allocatedContainers.entrySet()) {
+ if (entry.getValue().checkStopRequestedTimeout()) {
+ LOG.info("Timeout happened for NodeManager kill container request, recovering the container {} without waiting", entry.getKey());
+ recoverHelper(entry.getKey());
+ }
+ }
return amRmClient.allocate(0);
}
@@ -1246,26 +1252,44 @@ public class StreamingAppMasterService extends CompositeService
@Override
public void run()
{
- dnmgr.scheduleContainerRestart(containerId.toString());
- allocatedContainers.remove(containerId.toString());
+ recoverHelper(containerId.toString());
}
-
});
}
}
- private class AllocatedContainer
+ private void recoverHelper(final String containerId)
+ {
+ dnmgr.scheduleContainerRestart(containerId);
+ allocatedContainers.remove(containerId);
+ }
+
+ private static class AllocatedContainer
{
private final Container container;
- private boolean stopRequested;
+ private long stop = Long.MAX_VALUE;
+
private Token<StramDelegationTokenIdentifier> delegationToken;
private AllocatedContainer(Container c)
{
container = c;
}
- }
+ public void stop(NMClientAsync nmClient)
+ {
+ if (stop == Long.MAX_VALUE) {
+ nmClient.stopContainerAsync(container.getId(), container.getNodeId());
+ LOG.info("Requested stop container {}", container.getId().toString());
+ stop = System.currentTimeMillis();
+ }
+ }
+
+ public boolean checkStopRequestedTimeout()
+ {
+ return System.currentTimeMillis() - stop > REMOVE_CONTAINER_TIMEOUT;
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].