You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2017/03/01 17:06:10 UTC

tez git commit: TEZ-3643. Long running AMs can go out of memory due to retained AMContainer instances. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master ee4a9a908 -> 605154203


TEZ-3643. Long running AMs can go out of memory due to retained
AMContainer instances. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/60515420
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/60515420
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/60515420

Branch: refs/heads/master
Commit: 6051542030101e42738fa2c2da984bb2c744b9c5
Parents: ee4a9a9
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Mar 1 09:05:58 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Mar 1 09:05:58 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/dag/app/rm/container/AMContainer.java   |   1 +
 .../dag/app/rm/container/AMContainerImpl.java   |   1 +
 .../dag/app/rm/container/AMContainerMap.java    |  40 +++++-
 .../dag/app/rm/container/TestAMContainer.java   |   8 +-
 .../app/rm/container/TestAMContainerMap.java    | 126 +++++++++++++++----
 6 files changed, 146 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3806e27..7538f3e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3643. Long running AMs can go out of memory due to retained AMContainer instances.
   TEZ-3637. TezMerger logs too much at INFO level
   TEZ-3638. VertexImpl logs too much at info when removing tasks after auto-reduce parallelism
   TEZ-3634. reduce the default buffer sizes in PipelinedSorter by a small amount.

http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
index 8f5034e..5f90a89 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
@@ -36,4 +36,5 @@ public interface AMContainer extends EventHandler<AMContainerEvent>{
   public int getTaskSchedulerIdentifier();
   public int getContainerLauncherIdentifier();
   public int getTaskCommunicatorIdentifier();
+  public boolean isInErrorState();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 5d73a7b..ac429c7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -401,6 +401,7 @@ public class AMContainerImpl implements AMContainer {
     return this.taskCommId;
   }
 
+  @Override
   public boolean isInErrorState() {
     return inError;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
index ab43db1..050ffb6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
@@ -19,8 +19,11 @@
 package org.apache.tez.dag.app.rm.container;
 
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.common.ContainerSignatureMatcher;
 import org.slf4j.Logger;
@@ -41,7 +44,8 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
   private final TaskCommunicatorManagerInterface tal;
   private final AppContext context;
   private final ContainerSignatureMatcher containerSignatureMatcher;
-  private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
+  @VisibleForTesting
+  final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
 
   public AMContainerMap(ContainerHeartbeatHandler chh, TaskCommunicatorManagerInterface tal,
       ContainerSignatureMatcher containerSignatureMatcher, AppContext context) {
@@ -64,11 +68,23 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
   }
 
   public boolean addContainerIfNew(Container container, int schedulerId, int launcherId, int taskCommId) {
-    AMContainer amc = new AMContainerImpl(container, chh, tal,
-      containerSignatureMatcher, context, schedulerId, launcherId, taskCommId);
+    AMContainer amc = createAmContainer(container, chh, tal,
+        containerSignatureMatcher, context, schedulerId, launcherId, taskCommId);
+
     return (containerMap.putIfAbsent(container.getId(), amc) == null);
   }
 
+  AMContainer createAmContainer(Container container,
+                                ContainerHeartbeatHandler chh,
+                                TaskCommunicatorManagerInterface tal,
+                                ContainerSignatureMatcher signatureMatcher,
+                                AppContext appContext, int schedulerId,
+                                int launcherId, int taskCommId) {
+    AMContainer amc = new AMContainerImpl(container, chh, tal,
+        signatureMatcher, appContext, schedulerId, launcherId, taskCommId);
+    return amc;
+  }
+
   public AMContainer get(ContainerId containerId) {
     return containerMap.get(containerId);
   }
@@ -79,6 +95,24 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
 
   public void dagComplete(DAG dag){
     AMContainerHelpers.dagComplete(dag.getID());
+    // Cleanup completed containers after a query completes.
+    cleanupCompletedContainers();
+  }
+
+  private void cleanupCompletedContainers() {
+    Iterator<Map.Entry<ContainerId, AMContainer>> iterator = containerMap.entrySet().iterator();
+    int count = 0;
+    while (iterator.hasNext()) {
+      Map.Entry<ContainerId, AMContainer> entry = iterator.next();
+      AMContainer amContainer = entry.getValue();
+      if (AMContainerState.COMPLETED.equals(amContainer.getState()) || amContainer.isInErrorState()) {
+        iterator.remove();
+        count++;
+      }
+    }
+    LOG.info(
+        "Cleaned up completed containers on dagComplete. Removed={}, Remaining={}",
+        count, containerMap.size());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index ed14871..4d1bbae 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -1183,7 +1183,7 @@ public class TestAMContainer {
 
   // TODO Verify diagnostics in most of the tests.
 
-  private static class WrappedContainer {
+  static class WrappedContainer {
 
     long rmIdentifier = 2000;
     static final int taskPriority = 10;
@@ -1215,10 +1215,10 @@ public class TestAMContainer {
     public AMContainerImpl amContainer;
 
     @SuppressWarnings("deprecation") // ContainerId
-    public WrappedContainer(boolean shouldProfile, String profileString) {
+    public WrappedContainer(boolean shouldProfile, String profileString, int cIdInt) {
       applicationID = ApplicationId.newInstance(rmIdentifier, 1);
       appAttemptID = ApplicationAttemptId.newInstance(applicationID, 1);
-      containerID = ContainerId.newInstance(appAttemptID, 1);
+      containerID = ContainerId.newInstance(appAttemptID, cIdInt);
       nodeID = NodeId.newInstance("host", 12500);
       nodeHttpAddress = "host:12501";
       resource = Resource.newInstance(1024, 1);
@@ -1265,7 +1265,7 @@ public class TestAMContainer {
     }
 
     public WrappedContainer() {
-      this(false, null);
+      this(false, null, 1);
     }
 
     protected void mockDAGID() {

http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
index 2fcd0c8..efea327 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
@@ -18,11 +18,15 @@
 
 package org.apache.tez.dag.app.rm.container;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
 import java.net.InetSocketAddress;
+import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -31,43 +35,117 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.app.TaskCommunicatorWrapper;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.rm.container.TestAMContainer.WrappedContainer;
 import org.apache.tez.serviceplugins.api.TaskCommunicator;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.serviceplugins.api.ServicePluginException;
+import org.junit.Test;
 
 public class TestAMContainerMap {
 
-  private ContainerHeartbeatHandler mockContainerHeartBeatHandler() {
-    return mock(ContainerHeartbeatHandler.class);
-  }
 
-  private TaskCommunicatorManagerInterface mockTaskAttemptListener() throws ServicePluginException {
-    TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
-    TaskCommunicator taskComm = mock(TaskCommunicator.class);
-    doReturn(new InetSocketAddress("localhost", 21000)).when(taskComm).getAddress();
-    doReturn(taskComm).when(tal).getTaskCommunicator(0);
-    return tal;
-  }
+  @Test (timeout = 10000)
+  public void testCleanupOnDagComplete() {
 
-  private AppContext mockAppContext() {
+    ContainerHeartbeatHandler chh = mock(ContainerHeartbeatHandler.class);
+    TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
     AppContext appContext = mock(AppContext.class);
-    return appContext;
-  }
 
-  @SuppressWarnings("deprecation")
-  private ContainerId mockContainerId(int cId) {
-    ApplicationId appId = ApplicationId.newInstance(1000, 1);
-    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-    ContainerId containerId = ContainerId.newInstance(appAttemptId, cId);
-    return containerId;
+
+
+    int numContainers = 7;
+    WrappedContainer[] wContainers = new WrappedContainer[numContainers];
+    for (int i = 0 ; i < numContainers ; i++) {
+      WrappedContainer wc =
+          new WrappedContainer(false, null, i);
+      wContainers[i] = wc;
+    }
+
+    AMContainerMap amContainerMap = new AMContainerMapForTest(chh, tal, mock(
+        ContainerSignatureMatcher.class), appContext, wContainers);
+
+    for (int i = 0 ; i < numContainers ; i++) {
+      amContainerMap.addContainerIfNew(wContainers[i].container, 0, 0, 0);
+    }
+
+
+    // Container 1 in LAUNCHING state
+    wContainers[0].launchContainer();
+    wContainers[0].verifyState(AMContainerState.LAUNCHING);
+
+    // Container 2 in IDLE state
+    wContainers[1].launchContainer();
+    wContainers[1].containerLaunched();
+    wContainers[1].verifyState(AMContainerState.IDLE);
+
+    // Container 3 RUNNING state
+    wContainers[2].launchContainer();
+    wContainers[2].containerLaunched();
+    wContainers[2].assignTaskAttempt(wContainers[2].taskAttemptID);
+    wContainers[2].verifyState(AMContainerState.RUNNING);
+
+    // Cointainer 4 STOP_REQUESTED
+    wContainers[3].launchContainer();
+    wContainers[3].containerLaunched();
+    wContainers[3].stopRequest();
+    wContainers[3].verifyState(AMContainerState.STOP_REQUESTED);
+
+    // Container 5 STOPPING
+    wContainers[4].launchContainer();
+    wContainers[4].containerLaunched();
+    wContainers[4].stopRequest();
+    wContainers[4].nmStopSent();
+    wContainers[4].verifyState(AMContainerState.STOPPING);
+
+    // Container 6 COMPLETED
+    wContainers[5].launchContainer();
+    wContainers[5].containerLaunched();
+    wContainers[5].stopRequest();
+    wContainers[5].nmStopSent();
+    wContainers[5].containerCompleted();
+    wContainers[5].verifyState(AMContainerState.COMPLETED);
+
+    // Container 7 STOP_REQUESTED + ERROR
+    wContainers[6].launchContainer();
+    wContainers[6].containerLaunched();
+    wContainers[6].containerLaunched();
+    assertTrue(wContainers[6].amContainer.isInErrorState());
+    wContainers[6].verifyState(AMContainerState.STOP_REQUESTED);
+
+    // 7 containers present, and registered with AMContainerMap at this point.
+
+    assertEquals(7, amContainerMap.containerMap.size());
+    amContainerMap.dagComplete(mock(DAG.class));
+    assertEquals(5, amContainerMap.containerMap.size());
   }
 
-  private Container mockContainer(ContainerId containerId) {
-    NodeId nodeId = NodeId.newInstance("localhost", 43255);
-    Container container = Container.newInstance(containerId, nodeId, "localhost:33333",
-        Resource.newInstance(1024, 1), Priority.newInstance(1), mock(Token.class));
-    return container;
+  private static class AMContainerMapForTest extends AMContainerMap {
+
+
+    private WrappedContainer[] wrappedContainers;
+
+    public AMContainerMapForTest(ContainerHeartbeatHandler chh,
+                                 TaskCommunicatorManagerInterface tal,
+                                 ContainerSignatureMatcher containerSignatureMatcher,
+                                 AppContext context, WrappedContainer[] wrappedContainers) {
+      super(chh, tal, containerSignatureMatcher, context);
+      this.wrappedContainers = wrappedContainers;
+    }
+
+    @Override
+    AMContainer createAmContainer(Container container,
+                                  ContainerHeartbeatHandler chh,
+                                  TaskCommunicatorManagerInterface tal,
+                                  ContainerSignatureMatcher signatureMatcher,
+                                  AppContext appContext, int schedulerId,
+                                  int launcherId, int taskCommId) {
+      return wrappedContainers[container.getId().getId()].amContainer;
+    }
+
   }
 }