You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/11 04:00:49 UTC
svn commit: r1466752 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/p...
Author: vinodkv
Date: Thu Apr 11 02:00:47 2013
New Revision: 1466752
URL: http://svn.apache.org/r1466752
Log:
YARN-495. Changed NM reboot behaviour to be a simple resync - kill all containers and re-register with RM. Contributed by Jian He.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1466752&r1=1466751&r2=1466752&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu Apr 11 02:00:47 2013
@@ -135,6 +135,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-479. NM retry behavior for connection to RM should be similar for
lost heartbeats (Jian He via bikas)
+ YARN-495. Changed NM reboot behaviour to be a simple resync - kill all
+ containers and re-register with RM. (Jian He via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java?rev=1466752&r1=1466751&r2=1466752&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java Thu Apr 11 02:00:47 2013
@@ -24,5 +24,5 @@ package org.apache.hadoop.yarn.server.ap
*/
public enum NodeAction {
- NORMAL, REBOOT, SHUTDOWN
+ NORMAL, RESYNC, SHUTDOWN
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto?rev=1466752&r1=1466751&r2=1466752&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto Thu Apr 11 02:00:47 2013
@@ -25,7 +25,7 @@ import "yarn_protos.proto";
enum NodeActionProto {
NORMAL = 0;
- REBOOT = 1;
+ RESYNC = 1;
SHUTDOWN = 2;
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1466752&r1=1466751&r2=1466752&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Thu Apr 11 02:00:47 2013
@@ -81,6 +81,7 @@ public class NodeManager extends Composi
private Context context;
private AsyncDispatcher dispatcher;
private ContainerManagerImpl containerManager;
+ private NodeStatusUpdater nodeStatusUpdater;
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
private long waitForContainersOnShutdownMillis;
@@ -163,7 +164,7 @@ public class NodeManager extends Composi
addService(nodeHealthChecker);
dirsHandler = nodeHealthChecker.getDiskHandler();
- NodeStatusUpdater nodeStatusUpdater =
+ nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
@@ -214,35 +215,67 @@ public class NodeManager extends Composi
if (isStopping.getAndSet(true)) {
return;
}
-
- cleanupContainers();
+
+ cleanupContainers(NodeManagerEventType.SHUTDOWN);
super.stop();
DefaultMetricsSystem.shutdown();
}
-
+
+ protected void cleanupContainersOnResync() {
+ //we do not want to block dispatcher thread here
+ new Thread() {
+ @Override
+ public void run() {
+ cleanupContainers(NodeManagerEventType.RESYNC);
+ ((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater();
+ }
+ }.start();
+ }
+
@SuppressWarnings("unchecked")
- protected void cleanupContainers() {
+ protected void cleanupContainers(NodeManagerEventType eventType) {
Map<ContainerId, Container> containers = context.getContainers();
if (containers.isEmpty()) {
return;
}
- LOG.info("Containers still running on shutdown: " + containers.keySet());
+ LOG.info("Containers still running on " + eventType + " : "
+ + containers.keySet());
- List<ContainerId> containerIds = new ArrayList<ContainerId>(containers.keySet());
+ List<ContainerId> containerIds =
+ new ArrayList<ContainerId>(containers.keySet());
dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containerIds,
CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN));
LOG.info("Waiting for containers to be killed");
- long waitStartTime = System.currentTimeMillis();
- while (!containers.isEmpty() &&
- System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- LOG.warn("Interrupted while sleeping on container kill", ex);
+ switch (eventType) {
+ case SHUTDOWN:
+ long waitStartTime = System.currentTimeMillis();
+ while (!containers.isEmpty()
+ && System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ LOG.warn("Interrupted while sleeping on container kill on shutdown",
+ ex);
+ }
}
+ break;
+ case RESYNC:
+ while (!containers.isEmpty()) {
+ try {
+ Thread.sleep(1000);
+ //to remove done containers from the map
+ nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
+ } catch (InterruptedException ex) {
+ LOG.warn("Interrupted while sleeping on container kill on resync",
+ ex);
+ }
+ }
+ break;
+ default:
+ LOG.warn("Invalid eventType: " + eventType);
}
// All containers killed
@@ -342,9 +375,8 @@ public class NodeManager extends Composi
case SHUTDOWN:
stop();
break;
- case REBOOT:
- stop();
- reboot();
+ case RESYNC:
+ cleanupContainersOnResync();
break;
default:
LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
@@ -361,6 +393,11 @@ public class NodeManager extends Composi
return containerManager;
}
+ //For testing
+ Dispatcher getNMDispatcher(){
+ return dispatcher;
+ }
+
@VisibleForTesting
Context getNMContext() {
return this.context;
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java?rev=1466752&r1=1466751&r2=1466752&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java Thu Apr 11 02:00:47 2013
@@ -18,5 +18,5 @@
package org.apache.hadoop.yarn.server.nodemanager;
public enum NodeManagerEventType {
- SHUTDOWN, REBOOT
+ SHUTDOWN, RESYNC
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1466752&r1=1466751&r2=1466752&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Thu Apr 11 02:00:47 2013
@@ -18,9 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.service.Service;
public interface NodeStatusUpdater extends Service {
void sendOutofBandHeartBeat();
+ NodeStatus getNodeStatusAndUpdateContainersInContext();
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1466752&r1=1466751&r2=1466752&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Thu Apr 11 02:00:47 2013
@@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.service.AbstractService;
+import com.google.common.annotations.VisibleForTesting;
+
public class NodeStatusUpdaterImpl extends AbstractService implements
NodeStatusUpdater {
@@ -91,6 +93,9 @@ public class NodeStatusUpdaterImpl exten
private long rmConnectionRetryIntervalMS;
private boolean waitForEver;
+ private Runnable statusUpdaterRunnable;
+ private Thread statusUpdater;
+
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(NodeStatusUpdaterImpl.class.getName());
@@ -169,6 +174,22 @@ public class NodeStatusUpdaterImpl exten
this.isStopped = true;
super.stop();
}
+
+ protected void rebootNodeStatusUpdater() {
+ // Interrupt the updater.
+ this.isStopped = true;
+
+ try {
+ statusUpdater.join();
+ registerWithRM();
+ statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater");
+ this.isStopped = false;
+ statusUpdater.start();
+ LOG.info("NodeStatusUpdater thread is reRegistered and restarted");
+ } catch (Exception e) {
+ throw new AvroRuntimeException(e);
+ }
+ }
private boolean isSecurityEnabled() {
return UserGroupInformation.isSecurityEnabled();
@@ -188,7 +209,8 @@ public class NodeStatusUpdaterImpl exten
conf);
}
- private void registerWithRM() throws YarnRemoteException {
+ @VisibleForTesting
+ protected void registerWithRM() throws YarnRemoteException {
Configuration conf = getConfig();
rmConnectWaitMS =
conf.getInt(
@@ -312,7 +334,7 @@ public class NodeStatusUpdaterImpl exten
return appList;
}
- private NodeStatus getNodeStatus() {
+ public NodeStatus getNodeStatusAndUpdateContainersInContext() {
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
nodeStatus.setNodeId(this.nodeId);
@@ -387,7 +409,7 @@ public class NodeStatusUpdaterImpl exten
protected void startStatusUpdater() {
- new Thread("Node Status Updater") {
+ statusUpdaterRunnable = new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
@@ -398,7 +420,7 @@ public class NodeStatusUpdaterImpl exten
NodeHeartbeatResponse response = null;
int rmRetryCount = 0;
long waitStartTime = System.currentTimeMillis();
- NodeStatus nodeStatus = getNodeStatus();
+ NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext();
nodeStatus.setResponseId(lastHeartBeatID);
NodeHeartbeatRequest request = recordFactory
@@ -453,11 +475,11 @@ public class NodeStatusUpdaterImpl exten
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
break;
}
- if (response.getNodeAction() == NodeAction.REBOOT) {
+ if (response.getNodeAction() == NodeAction.RESYNC) {
LOG.info("Node is out of sync with ResourceManager,"
+ " hence rebooting.");
dispatcher.getEventHandler().handle(
- new NodeManagerEvent(NodeManagerEventType.REBOOT));
+ new NodeManagerEvent(NodeManagerEventType.RESYNC));
break;
}
@@ -500,6 +522,9 @@ public class NodeStatusUpdaterImpl exten
}
}
}
- }.start();
+ };
+ statusUpdater =
+ new Thread(statusUpdaterRunnable, "Node Status Updater");
+ statusUpdater.start();
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java?rev=1466752&r1=1466751&r2=1466752&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java Thu Apr 11 02:00:47 2013
@@ -160,7 +160,10 @@ public class TestNodeManagerReboot {
"container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(),
ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
- nm.handle(new NodeManagerEvent(NodeManagerEventType.REBOOT));
+ // restart the NodeManager
+ nm.stop();
+ nm = new MyNodeManager();
+ nm.start();
numTries = 0;
while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
@@ -250,26 +253,6 @@ public class TestNodeManagerReboot {
return delService;
}
- // mimic part of reboot process
- @Override
- public void handle(NodeManagerEvent event) {
- switch (event.getType()) {
- case SHUTDOWN:
- this.stop();
- break;
- case REBOOT:
- this.stop();
- this.createNewMyNodeManager().start();
- break;
- default:
- LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
- }
- }
-
- private MyNodeManager createNewMyNodeManager() {
- return new MyNodeManager();
- }
-
private YarnConfiguration createNMConfig() {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java?rev=1466752&r1=1466751&r2=1466752&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java Thu Apr 11 02:00:47 2013
@@ -28,6 +28,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
import junit.framework.Assert;
@@ -49,9 +52,12 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Before;
@@ -71,6 +77,7 @@ public class TestNodeManagerShutdown {
.getRecordFactory(null);
static final String user = "nobody";
private FileContext localFS;
+ private CyclicBarrier syncBarrier = new CyclicBarrier(2);
@Before
public void setup() throws UnsupportedFileSystemException {
@@ -91,11 +98,64 @@ public class TestNodeManagerShutdown {
NodeManager nm = getNodeManager();
nm.init(createNMConfig());
nm.start();
+ startContainers(nm);
+ final int MAX_TRIES=20;
+ int numTries = 0;
+ while (!processStartFile.exists() && numTries < MAX_TRIES) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ex) {ex.printStackTrace();}
+ numTries++;
+ }
+
+ nm.stop();
+
+ // Now verify the contents of the file
+ // Script generates a message when it receives a sigterm
+ // so we look for that
+ BufferedReader reader =
+ new BufferedReader(new FileReader(processStartFile));
+
+ boolean foundSigTermMessage = false;
+ while (true) {
+ String line = reader.readLine();
+ if (line == null) {
+ break;
+ }
+ if (line.contains("SIGTERM")) {
+ foundSigTermMessage = true;
+ break;
+ }
+ }
+ Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
+ reader.close();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testKillContainersOnResync() throws IOException, InterruptedException {
+ NodeManager nm = new TestNodeManager();
+ YarnConfiguration conf = createNMConfig();
+ nm.init(conf);
+ nm.start();
+ startContainers(nm);
+
+ assert ((TestNodeManager) nm).getNMRegistrationCount() == 1;
+ nm.getNMDispatcher().getEventHandler().
+ handle( new NodeManagerEvent(NodeManagerEventType.RESYNC));
+ try {
+ syncBarrier.await();
+ } catch (BrokenBarrierException e) {
+ }
+ assert ((TestNodeManager) nm).getNMRegistrationCount() == 2;
+ }
+
+ private void startContainers(NodeManager nm) throws IOException {
ContainerManagerImpl containerManager = nm.getContainerManager();
File scriptFile = createUnhaltingScriptFile();
- ContainerLaunchContext containerLaunchContext =
+ ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
// Construct the Container-id
@@ -127,7 +187,8 @@ public class TestNodeManagerShutdown {
containerLaunchContext.setResource(recordFactory
.newRecordInstance(Resource.class));
containerLaunchContext.getResource().setMemory(1024);
- StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
+ StartContainerRequest startRequest =
+ recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
containerManager.startContainer(startRequest);
@@ -137,37 +198,6 @@ public class TestNodeManagerShutdown {
ContainerStatus containerStatus =
containerManager.getContainerStatus(request).getStatus();
Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
-
- final int MAX_TRIES=20;
- int numTries = 0;
- while (!processStartFile.exists() && numTries < MAX_TRIES) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException ex) {ex.printStackTrace();}
- numTries++;
- }
-
- nm.stop();
-
- // Now verify the contents of the file
- // Script generates a message when it receives a sigterm
- // so we look for that
- BufferedReader reader =
- new BufferedReader(new FileReader(processStartFile));
-
- boolean foundSigTermMessage = false;
- while (true) {
- String line = reader.readLine();
- if (line == null) {
- break;
- }
- if (line.contains("SIGTERM")) {
- foundSigTermMessage = true;
- break;
- }
- }
- Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
- reader.close();
}
private ContainerId createContainerId() {
@@ -226,4 +256,48 @@ public class TestNodeManagerShutdown {
}
};
}
+
+ class TestNodeManager extends NodeManager {
+
+ private int registrationCount = 0;
+
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ return new TestNodeStatusUpdaterImpl(context, dispatcher,
+ healthChecker, metrics);
+ }
+
+ public int getNMRegistrationCount() {
+ return registrationCount;
+ }
+
+ class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater {
+
+ public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ super(context, dispatcher, healthChecker, metrics);
+ }
+
+ @Override
+ protected void registerWithRM() throws YarnRemoteException {
+ super.registerWithRM();
+ registrationCount++;
+ }
+
+ @Override
+ protected void rebootNodeStatusUpdater() {
+ ConcurrentMap<ContainerId, Container> containers =
+ getNMContext().getContainers();
+ // ensure that containers are empty before restart nodeStatusUpdater
+ Assert.assertTrue(containers.isEmpty());
+ super.rebootNodeStatusUpdater();
+ try {
+ syncBarrier.await();
+ } catch (InterruptedException e) {
+ } catch (BrokenBarrierException e) {
+ }
+ }
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1466752&r1=1466751&r2=1466752&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Thu Apr 11 02:00:47 2013
@@ -99,7 +99,6 @@ public class TestNodeStatusUpdater {
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
private final Configuration conf = createNMConfig();
private NodeManager nm;
- protected NodeManager rebootedNodeManager;
private boolean containerStatusBackupSuccessfully = true;
private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
@@ -663,8 +662,8 @@ public class TestNodeStatusUpdater {
}
@Override
- protected void cleanupContainers() {
- super.cleanupContainers();
+ protected void cleanupContainers(NodeManagerEventType eventType) {
+ super.cleanupContainers(NodeManagerEventType.SHUTDOWN);
numCleanups.incrementAndGet();
}
};
@@ -718,50 +717,6 @@ public class TestNodeStatusUpdater {
}
@Test
- public void testNodeReboot() throws Exception {
- nm = getNodeManager(NodeAction.REBOOT);
- YarnConfiguration conf = createNMConfig();
- nm.init(conf);
- Assert.assertEquals(STATE.INITED, nm.getServiceState());
- nm.start();
-
- int waitCount = 0;
- while (heartBeatID < 1 && waitCount++ != 20) {
- Thread.sleep(500);
- }
- Assert.assertFalse(heartBeatID < 1);
-
- // NM takes a while to reach the STOPPED state.
- waitCount = 0;
- while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
- LOG.info("Waiting for NM to stop..");
- Thread.sleep(1000);
- }
- Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
-
- waitCount = 0;
- while (null == rebootedNodeManager && waitCount++ != 20) {
- LOG.info("Waiting for NM to reinitialize..");
- Thread.sleep(1000);
- }
-
- waitCount = 0;
- while (rebootedNodeManager.getServiceState() != STATE.STARTED && waitCount++ != 20) {
- LOG.info("Waiting for NM to start..");
- Thread.sleep(1000);
- }
- Assert.assertEquals(STATE.STARTED, rebootedNodeManager.getServiceState());
-
- rebootedNodeManager.stop();
- waitCount = 0;
- while (rebootedNodeManager.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
- LOG.info("Waiting for NM to stop..");
- Thread.sleep(1000);
- }
- Assert.assertEquals(STATE.STOPPED, rebootedNodeManager.getServiceState());
- }
-
- @Test
public void testNMShutdownForRegistrationFailure() {
nm = new NodeManager() {
@@ -1108,12 +1063,6 @@ public class TestNodeStatusUpdater {
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
return myNodeStatusUpdater;
}
-
- @Override
- NodeManager createNewNodeManager() {
- rebootedNodeManager = getNodeManager(NodeAction.NORMAL);
- return rebootedNodeManager;
- }
};
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1466752&r1=1466751&r2=1466752&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Thu Apr 11 02:00:47 2013
@@ -73,13 +73,13 @@ public class ResourceTrackerService exte
private Server server;
private InetSocketAddress resourceTrackerAddress;
- private static final NodeHeartbeatResponse reboot = recordFactory
+ private static final NodeHeartbeatResponse resync = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
private static final NodeHeartbeatResponse shutDown = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
static {
- reboot.setNodeAction(NodeAction.REBOOT);
+ resync.setNodeAction(NodeAction.RESYNC);
shutDown.setNodeAction(NodeAction.SHUTDOWN);
}
@@ -220,7 +220,7 @@ public class ResourceTrackerService exte
if (rmNode == null) {
/* node does not exist */
LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
- return reboot;
+ return resync;
}
// Send ping
@@ -250,7 +250,7 @@ public class ResourceTrackerService exte
// TODO: Just sending reboot is not enough. Think more.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
- return reboot;
+ return resync;
}
// Heartbeat response
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1466752&r1=1466751&r2=1466752&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Thu Apr 11 02:00:47 2013
@@ -225,9 +225,9 @@ public class TestRMRestart {
// NM should be rebooted on heartbeat, even first heartbeat for nm2
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
- Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
+ Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
hbResponse = nm2.nodeHeartbeat(true);
- Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
+ Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
// new NM to represent NM re-register
nm1 = rm2.registerNode("h1:1234", 15120);
@@ -235,9 +235,9 @@ public class TestRMRestart {
// verify no more reboot response sent
hbResponse = nm1.nodeHeartbeat(true);
- Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction());
+ Assert.assertTrue(NodeAction.RESYNC != hbResponse.getNodeAction());
hbResponse = nm2.nodeHeartbeat(true);
- Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction());
+ Assert.assertTrue(NodeAction.RESYNC != hbResponse.getNodeAction());
// assert app1 attempt is saved
attempt1 = loadedApp1.getCurrentAppAttempt();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java?rev=1466752&r1=1466751&r2=1466752&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java Thu Apr 11 02:00:47 2013
@@ -282,7 +282,7 @@ public class TestResourceTrackerService
nodeHeartbeat = nm2.nodeHeartbeat(
new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
- Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction()));
+ Assert.assertTrue(NodeAction.RESYNC.equals(nodeHeartbeat.getNodeAction()));
checkRebootedNMCount(rm, ++initialMetricCount);
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java?rev=1466752&r1=1466751&r2=1466752&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java Thu Apr 11 02:00:47 2013
@@ -130,6 +130,6 @@ public class TestRMNMRPCResponseId {
nodeStatus.setResponseId(0);
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest);
- Assert.assertTrue(NodeAction.REBOOT.equals(response.getNodeAction()));
+ Assert.assertTrue(NodeAction.RESYNC.equals(response.getNodeAction()));
}
}
\ No newline at end of file