You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/03 18:57:40 UTC
svn commit: r1464106 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-...
Author: vinodkv
Date: Wed Apr 3 16:57:40 2013
New Revision: 1464106
URL: http://svn.apache.org/r1464106
Log:
YARN-101. Fix NodeManager heartbeat processing to not lose track of completed containers in case of dropped heartbeats. Contributed by Xuan Gong.
svn merge --ignore-ancestry -c 1464105 ../../trunk/
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1464106&r1=1464105&r2=1464106&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Apr 3 16:57:40 2013
@@ -125,6 +125,9 @@ Release 2.0.5-beta - UNRELEASED
local directory hits unix file count limits and thus prevent job failures.
(Omkar Vinit Joshi via vinodkv)
+ YARN-101. Fix NodeManager heartbeat processing to not lose track of completed
+ containers in case of dropped heartbeats. (Xuan Gong via vinodkv)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1464106&r1=1464105&r2=1464106&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Wed Apr 3 16:57:40 2013
@@ -119,6 +119,10 @@ public class NodeManager extends Composi
return new DeletionService(exec);
}
+ protected NMContext createNMContext(NMContainerTokenSecretManager containerTokenSecretManager) {
+ return new NMContext(containerTokenSecretManager);
+ }
+
protected void doSecureLogin() throws IOException {
SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB,
YarnConfiguration.NM_PRINCIPAL);
@@ -137,7 +141,7 @@ public class NodeManager extends Composi
containerTokenSecretManager = new NMContainerTokenSecretManager(conf);
}
- this.context = new NMContext(containerTokenSecretManager);
+ this.context = createNMContext(containerTokenSecretManager);
this.aclsManager = new ApplicationACLsManager(conf);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1464106&r1=1464105&r2=1464106&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Apr 3 16:57:40 2013
@@ -88,6 +88,10 @@ public class NodeStatusUpdaterImpl exten
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
+ private boolean previousHeartBeatSucceeded;
+ private List<ContainerStatus> previousContainersStatuses =
+ new ArrayList<ContainerStatus>();
+
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(NodeStatusUpdaterImpl.class.getName());
@@ -95,6 +99,7 @@ public class NodeStatusUpdaterImpl exten
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
+ this.previousHeartBeatSucceeded = true;
}
@Override
@@ -314,8 +319,14 @@ public class NodeStatusUpdaterImpl exten
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
nodeStatus.setNodeId(this.nodeId);
- int numActiveContainers = 0;
List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
+ if(previousHeartBeatSucceeded) {
+ previousContainersStatuses.clear();
+ } else {
+ containersStatuses.addAll(previousContainersStatuses);
+ }
+
+ int numActiveContainers = 0;
for (Iterator<Entry<ContainerId, Container>> i =
this.context.getContainers().entrySet().iterator(); i.hasNext();) {
Entry<ContainerId, Container> e = i.next();
@@ -330,6 +341,7 @@ public class NodeStatusUpdaterImpl exten
LOG.info("Sending out status for container: " + containerStatus);
if (containerStatus.getState() == ContainerState.COMPLETE) {
+ previousContainersStatuses.add(containerStatus);
// Remove
i.remove();
@@ -404,6 +416,7 @@ public class NodeStatusUpdaterImpl exten
}
NodeHeartbeatResponse response =
resourceTracker.nodeHeartbeat(request);
+ previousHeartBeatSucceeded = true;
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
// See if the master-key has rolled over
@@ -449,6 +462,7 @@ public class NodeStatusUpdaterImpl exten
new CMgrCompletedAppsEvent(appsToCleanup));
}
} catch (Throwable e) {
+ previousHeartBeatSucceeded = false;
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1464106&r1=1464105&r2=1464106&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Wed Apr 3 16:57:40 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -29,6 +30,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -58,11 +61,13 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.service.Service;
@@ -92,6 +97,8 @@ public class TestNodeStatusUpdater {
private final Configuration conf = createNMConfig();
private NodeManager nm;
protected NodeManager rebootedNodeManager;
+ private boolean containerStatusBackupSuccessfully = true;
+ private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
@After
public void tearDown() {
@@ -237,6 +244,22 @@ public class TestNodeStatusUpdater {
}
}
+ private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl {
+ public ResourceTracker resourceTracker;
+
+ public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ super(context, dispatcher, healthChecker, metrics);
+ resourceTracker = new MyResourceTracker4(context);
+ }
+
+ @Override
+ protected ResourceTracker getRMClient() {
+ return resourceTracker;
+ }
+
+ }
+
private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker;
private Context context;
@@ -384,6 +407,104 @@ public class TestNodeStatusUpdater {
}
}
+ private class MyResourceTracker4 implements ResourceTracker {
+
+ public NodeAction registerNodeAction = NodeAction.NORMAL;
+ public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
+ private Context context;
+
+ public MyResourceTracker4(Context context) {
+ this.context = context;
+ }
+
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnRemoteException {
+ RegisterNodeManagerResponse response = recordFactory
+ .newRecordInstance(RegisterNodeManagerResponse.class);
+ response.setNodeAction(registerNodeAction);
+ return response;
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnRemoteException {
+ try {
+ if (heartBeatID == 0) {
+ Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
+ .size(), 0);
+ Assert.assertEquals(context.getContainers().size(), 0);
+ } else if (heartBeatID == 1) {
+ Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
+ .size(), 5);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(0).getState() == ContainerState.RUNNING
+ && request.getNodeStatus().getContainersStatuses().get(0)
+ .getContainerId().getId() == 1);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(1).getState() == ContainerState.RUNNING
+ && request.getNodeStatus().getContainersStatuses().get(1)
+ .getContainerId().getId() == 2);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(2).getState() == ContainerState.COMPLETE
+ && request.getNodeStatus().getContainersStatuses().get(2)
+ .getContainerId().getId() == 3);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(3).getState() == ContainerState.COMPLETE
+ && request.getNodeStatus().getContainersStatuses().get(3)
+ .getContainerId().getId() == 4);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(4).getState() == ContainerState.RUNNING
+ && request.getNodeStatus().getContainersStatuses().get(4)
+ .getContainerId().getId() == 5);
+ throw new YarnException("Lost the heartbeat response");
+ } else if (heartBeatID == 2) {
+ Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
+ .size(), 7);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(0).getState() == ContainerState.COMPLETE
+ && request.getNodeStatus().getContainersStatuses().get(0)
+ .getContainerId().getId() == 3);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(1).getState() == ContainerState.COMPLETE
+ && request.getNodeStatus().getContainersStatuses().get(1)
+ .getContainerId().getId() == 4);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(2).getState() == ContainerState.RUNNING
+ && request.getNodeStatus().getContainersStatuses().get(2)
+ .getContainerId().getId() == 1);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(3).getState() == ContainerState.RUNNING
+ && request.getNodeStatus().getContainersStatuses().get(3)
+ .getContainerId().getId() == 2);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(4).getState() == ContainerState.RUNNING
+ && request.getNodeStatus().getContainersStatuses().get(4)
+ .getContainerId().getId() == 5);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(5).getState() == ContainerState.RUNNING
+ && request.getNodeStatus().getContainersStatuses().get(5)
+ .getContainerId().getId() == 6);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(6).getState() == ContainerState.COMPLETE
+ && request.getNodeStatus().getContainersStatuses().get(6)
+ .getContainerId().getId() == 7);
+ }
+ } catch (AssertionError error) {
+ LOG.info(error);
+ containerStatusBackupSuccessfully = false;
+ } finally {
+ heartBeatID++;
+ }
+ NodeStatus nodeStatus = request.getNodeStatus();
+ nodeStatus.setResponseId(heartBeatID);
+ NodeHeartbeatResponse nhResponse =
+ YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
+ heartBeatNodeAction, null, null, null, 1000L);
+ return nhResponse;
+ }
+ }
+
@Before
public void clearError() {
nmStartError = null;
@@ -725,6 +846,127 @@ public class TestNodeStatusUpdater {
}
}
+ /**
+ * Test completed containerStatus get back up when heart beat lost
+ */
+ @Test(timeout = 20000)
+ public void testCompletedContainerStatusBackup() throws Exception {
+ nm = new NodeManager() {
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ MyNodeStatusUpdater2 myNodeStatusUpdater =
+ new MyNodeStatusUpdater2(context, dispatcher, healthChecker,
+ metrics);
+ return myNodeStatusUpdater;
+ }
+
+ @Override
+ protected NMContext createNMContext(
+ NMContainerTokenSecretManager containerTokenSecretManager) {
+ return new MyNMContext(containerTokenSecretManager);
+ }
+
+ };
+
+ YarnConfiguration conf = createNMConfig();
+ nm.init(conf);
+ nm.start();
+
+ int waitCount = 0;
+ while (heartBeatID <= 3 && waitCount++ != 20) {
+ Thread.sleep(500);
+ }
+ if(!containerStatusBackupSuccessfully) {
+ Assert.fail("ContainerStatus Backup failed");
+ }
+ nm.stop();
+ }
+
+ private class MyNMContext extends NMContext {
+ ConcurrentMap<ContainerId, Container> containers =
+ new ConcurrentSkipListMap<ContainerId, Container>();
+
+ public MyNMContext(NMContainerTokenSecretManager
+ containerTokenSecretManager) {
+ super(containerTokenSecretManager);
+ }
+
+ @Override
+ public ConcurrentMap<ContainerId, Container> getContainers() {
+ if (heartBeatID == 0) {
+ return containers;
+ } else if (heartBeatID == 1) {
+ ContainerStatus containerStatus1 =
+ createContainerStatus(1, ContainerState.RUNNING);
+ Container container1 = getMockContainer(containerStatus1);
+ containers.put(containerStatus1.getContainerId(), container1);
+
+ ContainerStatus containerStatus2 =
+ createContainerStatus(2, ContainerState.RUNNING);
+ Container container2 = getMockContainer(containerStatus2);
+ containers.put(containerStatus2.getContainerId(), container2);
+
+ ContainerStatus containerStatus3 =
+ createContainerStatus(3, ContainerState.COMPLETE);
+ Container container3 = getMockContainer(containerStatus3);
+ containers.put(containerStatus3.getContainerId(), container3);
+ completedContainerStatusList.add(containerStatus3);
+
+ ContainerStatus containerStatus4 =
+ createContainerStatus(4, ContainerState.COMPLETE);
+ Container container4 = getMockContainer(containerStatus4);
+ containers.put(containerStatus4.getContainerId(), container4);
+ completedContainerStatusList.add(containerStatus4);
+
+ ContainerStatus containerStatus5 =
+ createContainerStatus(5, ContainerState.RUNNING);
+ Container container5 = getMockContainer(containerStatus5);
+ containers.put(containerStatus5.getContainerId(), container5);
+
+ return containers;
+ } else if (heartBeatID == 2) {
+ ContainerStatus containerStatus6 =
+ createContainerStatus(6, ContainerState.RUNNING);
+ Container container6 = getMockContainer(containerStatus6);
+ containers.put(containerStatus6.getContainerId(), container6);
+
+ ContainerStatus containerStatus7 =
+ createContainerStatus(7, ContainerState.COMPLETE);
+ Container container7 = getMockContainer(containerStatus7);
+ containers.put(containerStatus7.getContainerId(), container7);
+ completedContainerStatusList.add(containerStatus7);
+
+ return containers;
+ } else {
+ containers.clear();
+
+ return containers;
+ }
+ }
+
+ private ContainerStatus createContainerStatus(int id,
+ ContainerState containerState) {
+ ApplicationId applicationId =
+ BuilderUtils.newApplicationId(System.currentTimeMillis(), id);
+ ApplicationAttemptId applicationAttemptId =
+ BuilderUtils.newApplicationAttemptId(applicationId, id);
+ ContainerId contaierId =
+ BuilderUtils.newContainerId(applicationAttemptId, id);
+ ContainerStatus containerStatus =
+ BuilderUtils.newContainerStatus(contaierId, containerState,
+ "test_containerStatus: id=" + id + ", containerState: "
+ + containerState, 0);
+ return containerStatus;
+ }
+
+ private Container getMockContainer(ContainerStatus containerStatus) {
+ Container container = mock(Container.class);
+ when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus);
+ return container;
+ }
+ }
+
private void verifyNodeStartFailure(String errMessage) {
YarnConfiguration conf = createNMConfig();
nm.init(conf);