You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by bi...@apache.org on 2013/04/08 21:17:16 UTC
svn commit: r1465731 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/...
Author: bikas
Date: Mon Apr 8 19:17:16 2013
New Revision: 1465731
URL: http://svn.apache.org/r1465731
Log:
YARN-479. NM retry behavior for connection to RM should be similar for lost heartbeats (Jian He via bikas)
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1465731&r1=1465730&r2=1465731&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Mon Apr 8 19:17:16 2013
@@ -126,6 +126,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-193. Scheduler.normalizeRequest does not account for allocation
requests that exceed maximumAllocation limits (Zhijie Shen via bikas)
+ YARN-479. NM retry behavior for connection to RM should be similar for
+ lost heartbeats (Jian He via bikas)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1465731&r1=1465730&r2=1465731&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Mon Apr 8 19:17:16 2013
@@ -87,10 +87,9 @@ public class NodeStatusUpdaterImpl exten
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
-
- private boolean previousHeartBeatSucceeded;
- private List<ContainerStatus> previousContainersStatuses =
- new ArrayList<ContainerStatus>();
+ private long rmConnectWaitMS;
+ private long rmConnectionRetryIntervalMS;
+ private boolean waitForEver;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@@ -99,7 +98,6 @@ public class NodeStatusUpdaterImpl exten
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
- this.previousHeartBeatSucceeded = true;
}
@Override
@@ -137,8 +135,8 @@ public class NodeStatusUpdaterImpl exten
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
LOG.info("Initialized nodemanager for " + nodeId + ":" +
- " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
- " physical-cores=" + cpuCores + " virtual-cores=" + virtualCores);
+ " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
+ " physical-cores=" + cpuCores + " virtual-cores=" + virtualCores);
super.init(conf);
}
@@ -192,12 +190,12 @@ public class NodeStatusUpdaterImpl exten
private void registerWithRM() throws YarnRemoteException {
Configuration conf = getConfig();
- long rmConnectWaitMS =
+ rmConnectWaitMS =
conf.getInt(
YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS)
* 1000;
- long rmConnectionRetryIntervalMS =
+ rmConnectionRetryIntervalMS =
conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
YarnConfiguration
@@ -210,7 +208,7 @@ public class NodeStatusUpdaterImpl exten
" should not be negative.");
}
- boolean waitForEver = (rmConnectWaitMS == -1000);
+ waitForEver = (rmConnectWaitMS == -1000);
if(! waitForEver) {
if(rmConnectWaitMS < 0) {
@@ -319,14 +317,8 @@ public class NodeStatusUpdaterImpl exten
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
nodeStatus.setNodeId(this.nodeId);
- List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
- if(previousHeartBeatSucceeded) {
- previousContainersStatuses.clear();
- } else {
- containersStatuses.addAll(previousContainersStatuses);
- }
-
int numActiveContainers = 0;
+ List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
for (Iterator<Entry<ContainerId, Container>> i =
this.context.getContainers().entrySet().iterator(); i.hasNext();) {
Entry<ContainerId, Container> e = i.next();
@@ -341,7 +333,6 @@ public class NodeStatusUpdaterImpl exten
LOG.info("Sending out status for container: " + containerStatus);
if (containerStatus.getState() == ContainerState.COMPLETE) {
- previousContainersStatuses.add(containerStatus);
// Remove
i.remove();
@@ -404,6 +395,9 @@ public class NodeStatusUpdaterImpl exten
while (!isStopped) {
// Send heartbeat
try {
+ NodeHeartbeatResponse response = null;
+ int rmRetryCount = 0;
+ long waitStartTime = System.currentTimeMillis();
NodeStatus nodeStatus = getNodeStatus();
nodeStatus.setResponseId(lastHeartBeatID);
@@ -414,9 +408,31 @@ public class NodeStatusUpdaterImpl exten
request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey());
}
- NodeHeartbeatResponse response =
- resourceTracker.nodeHeartbeat(request);
- previousHeartBeatSucceeded = true;
+ while (!isStopped) {
+ try {
+ rmRetryCount++;
+ response = resourceTracker.nodeHeartbeat(request);
+ break;
+ } catch (Throwable e) {
+ LOG.warn("Trying to heartbeat to ResourceManager, "
+ + "current no. of failed attempts is " + rmRetryCount);
+ if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
+ || waitForEver) {
+ try {
+ LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
+ + " seconds before next heartbeat to RM");
+ Thread.sleep(rmConnectionRetryIntervalMS);
+ } catch(InterruptedException ex) {
+ //done nothing
+ }
+ } else {
+ String errorMessage = "Failed to heartbeat to RM, " +
+ "no. of failed attempts is "+rmRetryCount;
+ LOG.error(errorMessage,e);
+ throw new YarnException(errorMessage,e);
+ }
+ }
+ }
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
// See if the master-key has rolled over
@@ -432,7 +448,7 @@ public class NodeStatusUpdaterImpl exten
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
LOG
.info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
- " hence shutting down.");
+ " hence shutting down.");
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
break;
@@ -461,8 +477,12 @@ public class NodeStatusUpdaterImpl exten
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup));
}
+ } catch (YarnException e) {
+ //catch and throw the exception if tried MAX wait time to connect RM
+ dispatcher.getEventHandler().handle(
+ new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
+ throw e;
} catch (Throwable e) {
- previousHeartBeatSucceeded = false;
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1465731&r1=1465730&r2=1465731&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Mon Apr 8 19:17:16 2013
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -51,9 +52,11 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -167,6 +170,10 @@ public class TestNodeStatusUpdater {
throws YarnRemoteException {
NodeStatus nodeStatus = request.getNodeStatus();
LOG.info("Got heartbeat number " + heartBeatID);
+ NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class);
+ Dispatcher mockDispatcher = mock(Dispatcher.class);
+ EventHandler mockEventHandler = mock(EventHandler.class);
+ when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler);
nodeStatus.setResponseId(heartBeatID++);
Map<ApplicationId, List<ContainerStatus>> appToContainers =
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
@@ -183,7 +190,8 @@ public class TestNodeStatusUpdater {
launchContext.setContainerId(firstContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(2);
- Container container = new ContainerImpl(conf , null, launchContext, null, null);
+ Container container = new ContainerImpl(conf , mockDispatcher,
+ launchContext, null, mockMetrics);
this.context.getContainers().put(firstContainerID, container);
} else if (heartBeatID == 2) {
// Checks on the RM end
@@ -207,7 +215,8 @@ public class TestNodeStatusUpdater {
launchContext.setContainerId(secondContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(3);
- Container container = new ContainerImpl(conf, null, launchContext, null, null);
+ Container container = new ContainerImpl(conf, mockDispatcher,
+ launchContext, null, mockMetrics);
this.context.getContainers().put(secondContainerID, container);
} else if (heartBeatID == 3) {
// Checks on the RM end
@@ -229,13 +238,14 @@ public class TestNodeStatusUpdater {
}
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
- public ResourceTracker resourceTracker = new MyResourceTracker(this.context);
+ public ResourceTracker resourceTracker;
private Context context;
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
this.context = context;
+ resourceTracker = new MyResourceTracker(this.context);
}
@Override
@@ -312,6 +322,21 @@ public class TestNodeStatusUpdater {
}
}
+ private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl {
+ private ResourceTracker resourceTracker;
+
+ public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ super(context, dispatcher, healthChecker, metrics);
+ resourceTracker = new MyResourceTracker5();
+ }
+
+ @Override
+ protected ResourceTracker getRMClient() {
+ return resourceTracker;
+ }
+ }
+
private class MyNodeManager extends NodeManager {
private MyNodeStatusUpdater3 nodeStatusUpdater;
@@ -328,6 +353,32 @@ public class TestNodeStatusUpdater {
}
}
+ private class MyNodeManager2 extends NodeManager {
+ public boolean isStopped = false;
+ private NodeStatusUpdater nodeStatusUpdater;
+ private CyclicBarrier syncBarrier;
+ public MyNodeManager2 (CyclicBarrier syncBarrier) {
+ this.syncBarrier = syncBarrier;
+ }
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ nodeStatusUpdater =
+ new MyNodeStatusUpdater5(context, dispatcher, healthChecker,
+ metrics);
+ return nodeStatusUpdater;
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ isStopped = true;
+ try {
+ syncBarrier.await();
+ } catch (Exception e) {
+ }
+ }
+ }
//
private class MyResourceTracker2 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
@@ -505,6 +556,26 @@ public class TestNodeStatusUpdater {
}
}
+ private class MyResourceTracker5 implements ResourceTracker {
+ public NodeAction registerNodeAction = NodeAction.NORMAL;
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnRemoteException {
+
+ RegisterNodeManagerResponse response = recordFactory
+ .newRecordInstance(RegisterNodeManagerResponse.class);
+ response.setNodeAction(registerNodeAction );
+ return response;
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnRemoteException {
+ heartBeatID++;
+ throw RPCUtil.getRemoteException("NodeHeartbeat exception");
+ }
+ }
+
@Before
public void clearError() {
nmStartError = null;
@@ -883,6 +954,30 @@ public class TestNodeStatusUpdater {
nm.stop();
}
+ @Test(timeout = 20000)
+ public void testNodeStatusUpdaterRetryAndNMShutdown()
+ throws InterruptedException {
+ final long connectionWaitSecs = 1;
+ final long connectionRetryIntervalSecs = 1;
+ YarnConfiguration conf = createNMConfig();
+ conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+ connectionWaitSecs);
+ conf.setLong(YarnConfiguration
+ .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
+ connectionRetryIntervalSecs);
+ CyclicBarrier syncBarrier = new CyclicBarrier(2);
+ nm = new MyNodeManager2(syncBarrier);
+ nm.init(conf);
+ nm.start();
+ try {
+ syncBarrier.await();
+ } catch (Exception e) {
+ }
+ Assert.assertTrue(((MyNodeManager2) nm).isStopped);
+ Assert.assertTrue("calculate heartBeatCount based on" +
+ " connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
+ }
+
private class MyNMContext extends NMContext {
ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>();