You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/03/28 19:48:04 UTC
[23/50] [abbrv] tez git commit: TEZ-3643. Long running AMs can go out
of memory due to retained AMContainer instances. (sseth)
TEZ-3643. Long running AMs can go out of memory due to retained
AMContainer instances. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/60515420
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/60515420
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/60515420
Branch: refs/heads/TEZ-1190
Commit: 6051542030101e42738fa2c2da984bb2c744b9c5
Parents: ee4a9a9
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Mar 1 09:05:58 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Mar 1 09:05:58 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/rm/container/AMContainer.java | 1 +
.../dag/app/rm/container/AMContainerImpl.java | 1 +
.../dag/app/rm/container/AMContainerMap.java | 40 +++++-
.../dag/app/rm/container/TestAMContainer.java | 8 +-
.../app/rm/container/TestAMContainerMap.java | 126 +++++++++++++++----
6 files changed, 146 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3806e27..7538f3e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3643. Long running AMs can go out of memory due to retained AMContainer instances.
TEZ-3637. TezMerger logs too much at INFO level
TEZ-3638. VertexImpl logs too much at info when removing tasks after auto-reduce parallelism
TEZ-3634. reduce the default buffer sizes in PipelinedSorter by a small amount.
http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
index 8f5034e..5f90a89 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
@@ -36,4 +36,5 @@ public interface AMContainer extends EventHandler<AMContainerEvent>{
public int getTaskSchedulerIdentifier();
public int getContainerLauncherIdentifier();
public int getTaskCommunicatorIdentifier();
+ public boolean isInErrorState();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 5d73a7b..ac429c7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -401,6 +401,7 @@ public class AMContainerImpl implements AMContainer {
return this.taskCommId;
}
+ @Override
public boolean isInErrorState() {
return inError;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
index ab43db1..050ffb6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
@@ -19,8 +19,11 @@
package org.apache.tez.dag.app.rm.container;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.slf4j.Logger;
@@ -41,7 +44,8 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
private final TaskCommunicatorManagerInterface tal;
private final AppContext context;
private final ContainerSignatureMatcher containerSignatureMatcher;
- private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
+ @VisibleForTesting
+ final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
public AMContainerMap(ContainerHeartbeatHandler chh, TaskCommunicatorManagerInterface tal,
ContainerSignatureMatcher containerSignatureMatcher, AppContext context) {
@@ -64,11 +68,23 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
}
public boolean addContainerIfNew(Container container, int schedulerId, int launcherId, int taskCommId) {
- AMContainer amc = new AMContainerImpl(container, chh, tal,
- containerSignatureMatcher, context, schedulerId, launcherId, taskCommId);
+ AMContainer amc = createAmContainer(container, chh, tal,
+ containerSignatureMatcher, context, schedulerId, launcherId, taskCommId);
+
return (containerMap.putIfAbsent(container.getId(), amc) == null);
}
+ AMContainer createAmContainer(Container container,
+ ContainerHeartbeatHandler chh,
+ TaskCommunicatorManagerInterface tal,
+ ContainerSignatureMatcher signatureMatcher,
+ AppContext appContext, int schedulerId,
+ int launcherId, int taskCommId) {
+ AMContainer amc = new AMContainerImpl(container, chh, tal,
+ signatureMatcher, appContext, schedulerId, launcherId, taskCommId);
+ return amc;
+ }
+
public AMContainer get(ContainerId containerId) {
return containerMap.get(containerId);
}
@@ -79,6 +95,24 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
public void dagComplete(DAG dag){
AMContainerHelpers.dagComplete(dag.getID());
+ // Cleanup completed containers after a query completes.
+ cleanupCompletedContainers();
+ }
+
+ private void cleanupCompletedContainers() {
+ Iterator<Map.Entry<ContainerId, AMContainer>> iterator = containerMap.entrySet().iterator();
+ int count = 0;
+ while (iterator.hasNext()) {
+ Map.Entry<ContainerId, AMContainer> entry = iterator.next();
+ AMContainer amContainer = entry.getValue();
+ if (AMContainerState.COMPLETED.equals(amContainer.getState()) || amContainer.isInErrorState()) {
+ iterator.remove();
+ count++;
+ }
+ }
+ LOG.info(
+ "Cleaned up completed containers on dagComplete. Removed={}, Remaining={}",
+ count, containerMap.size());
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index ed14871..4d1bbae 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -1183,7 +1183,7 @@ public class TestAMContainer {
// TODO Verify diagnostics in most of the tests.
- private static class WrappedContainer {
+ static class WrappedContainer {
long rmIdentifier = 2000;
static final int taskPriority = 10;
@@ -1215,10 +1215,10 @@ public class TestAMContainer {
public AMContainerImpl amContainer;
@SuppressWarnings("deprecation") // ContainerId
- public WrappedContainer(boolean shouldProfile, String profileString) {
+ public WrappedContainer(boolean shouldProfile, String profileString, int cIdInt) {
applicationID = ApplicationId.newInstance(rmIdentifier, 1);
appAttemptID = ApplicationAttemptId.newInstance(applicationID, 1);
- containerID = ContainerId.newInstance(appAttemptID, 1);
+ containerID = ContainerId.newInstance(appAttemptID, cIdInt);
nodeID = NodeId.newInstance("host", 12500);
nodeHttpAddress = "host:12501";
resource = Resource.newInstance(1024, 1);
@@ -1265,7 +1265,7 @@ public class TestAMContainer {
}
public WrappedContainer() {
- this(false, null);
+ this(false, null, 1);
}
protected void mockDAGID() {
http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
index 2fcd0c8..efea327 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
@@ -18,11 +18,15 @@
package org.apache.tez.dag.app.rm.container;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.net.InetSocketAddress;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -31,43 +35,117 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.app.TaskCommunicatorWrapper;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.rm.container.TestAMContainer.WrappedContainer;
import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.serviceplugins.api.ServicePluginException;
+import org.junit.Test;
public class TestAMContainerMap {
- private ContainerHeartbeatHandler mockContainerHeartBeatHandler() {
- return mock(ContainerHeartbeatHandler.class);
- }
- private TaskCommunicatorManagerInterface mockTaskAttemptListener() throws ServicePluginException {
- TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
- TaskCommunicator taskComm = mock(TaskCommunicator.class);
- doReturn(new InetSocketAddress("localhost", 21000)).when(taskComm).getAddress();
- doReturn(taskComm).when(tal).getTaskCommunicator(0);
- return tal;
- }
+ @Test (timeout = 10000)
+ public void testCleanupOnDagComplete() {
- private AppContext mockAppContext() {
+ ContainerHeartbeatHandler chh = mock(ContainerHeartbeatHandler.class);
+ TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
AppContext appContext = mock(AppContext.class);
- return appContext;
- }
- @SuppressWarnings("deprecation")
- private ContainerId mockContainerId(int cId) {
- ApplicationId appId = ApplicationId.newInstance(1000, 1);
- ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
- ContainerId containerId = ContainerId.newInstance(appAttemptId, cId);
- return containerId;
+
+
+ int numContainers = 7;
+ WrappedContainer[] wContainers = new WrappedContainer[numContainers];
+ for (int i = 0 ; i < numContainers ; i++) {
+ WrappedContainer wc =
+ new WrappedContainer(false, null, i);
+ wContainers[i] = wc;
+ }
+
+ AMContainerMap amContainerMap = new AMContainerMapForTest(chh, tal, mock(
+ ContainerSignatureMatcher.class), appContext, wContainers);
+
+ for (int i = 0 ; i < numContainers ; i++) {
+ amContainerMap.addContainerIfNew(wContainers[i].container, 0, 0, 0);
+ }
+
+
+ // Container 1 in LAUNCHING state
+ wContainers[0].launchContainer();
+ wContainers[0].verifyState(AMContainerState.LAUNCHING);
+
+ // Container 2 in IDLE state
+ wContainers[1].launchContainer();
+ wContainers[1].containerLaunched();
+ wContainers[1].verifyState(AMContainerState.IDLE);
+
+ // Container 3 RUNNING state
+ wContainers[2].launchContainer();
+ wContainers[2].containerLaunched();
+ wContainers[2].assignTaskAttempt(wContainers[2].taskAttemptID);
+ wContainers[2].verifyState(AMContainerState.RUNNING);
+
+ // Cointainer 4 STOP_REQUESTED
+ wContainers[3].launchContainer();
+ wContainers[3].containerLaunched();
+ wContainers[3].stopRequest();
+ wContainers[3].verifyState(AMContainerState.STOP_REQUESTED);
+
+ // Container 5 STOPPING
+ wContainers[4].launchContainer();
+ wContainers[4].containerLaunched();
+ wContainers[4].stopRequest();
+ wContainers[4].nmStopSent();
+ wContainers[4].verifyState(AMContainerState.STOPPING);
+
+ // Container 6 COMPLETED
+ wContainers[5].launchContainer();
+ wContainers[5].containerLaunched();
+ wContainers[5].stopRequest();
+ wContainers[5].nmStopSent();
+ wContainers[5].containerCompleted();
+ wContainers[5].verifyState(AMContainerState.COMPLETED);
+
+ // Container 7 STOP_REQUESTED + ERROR
+ wContainers[6].launchContainer();
+ wContainers[6].containerLaunched();
+ wContainers[6].containerLaunched();
+ assertTrue(wContainers[6].amContainer.isInErrorState());
+ wContainers[6].verifyState(AMContainerState.STOP_REQUESTED);
+
+ // 7 containers present, and registered with AMContainerMap at this point.
+
+ assertEquals(7, amContainerMap.containerMap.size());
+ amContainerMap.dagComplete(mock(DAG.class));
+ assertEquals(5, amContainerMap.containerMap.size());
}
- private Container mockContainer(ContainerId containerId) {
- NodeId nodeId = NodeId.newInstance("localhost", 43255);
- Container container = Container.newInstance(containerId, nodeId, "localhost:33333",
- Resource.newInstance(1024, 1), Priority.newInstance(1), mock(Token.class));
- return container;
+ private static class AMContainerMapForTest extends AMContainerMap {
+
+
+ private WrappedContainer[] wrappedContainers;
+
+ public AMContainerMapForTest(ContainerHeartbeatHandler chh,
+ TaskCommunicatorManagerInterface tal,
+ ContainerSignatureMatcher containerSignatureMatcher,
+ AppContext context, WrappedContainer[] wrappedContainers) {
+ super(chh, tal, containerSignatureMatcher, context);
+ this.wrappedContainers = wrappedContainers;
+ }
+
+ @Override
+ AMContainer createAmContainer(Container container,
+ ContainerHeartbeatHandler chh,
+ TaskCommunicatorManagerInterface tal,
+ ContainerSignatureMatcher signatureMatcher,
+ AppContext appContext, int schedulerId,
+ int launcherId, int taskCommId) {
+ return wrappedContainers[container.getId().getId()].amContainer;
+ }
+
}
}