You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/10/27 14:44:57 UTC
svn commit: r1189723 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/
hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/s...
Author: vinodkv
Date: Thu Oct 27 12:44:57 2011
New Revision: 1189723
URL: http://svn.apache.org/viewvc?rev=1189723&view=rev
Log:
MAPREDUCE-2986. Fixed MiniYARNCluster to support multiple NodeManagers. Contributed by Anupam Seth.
svn merge -c r1189721 --ignore-ancestry ../../trunk/
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Oct 27 12:44:57 2011
@@ -383,6 +383,9 @@ Release 0.23.0 - Unreleased
virtual, allowing for a ratio between the two to be configurable. (todd
via acmurthy)
+ MAPREDUCE-2986. Fixed MiniYARNCluster to support multiple NodeManagers.
+ (Anupam Seth via vinodkv)
+
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Thu Oct 27 12:44:57 2011
@@ -58,7 +58,11 @@ public class MiniMRYarnCluster extends M
private JobHistoryServerWrapper historyServerWrapper;
public MiniMRYarnCluster(String testName) {
- super(testName);
+ this(testName, 1);
+ }
+
+ public MiniMRYarnCluster(String testName, int noOfNMs) {
+ super(testName, noOfNMs);
//TODO: add the history server
historyServerWrapper = new JobHistoryServerWrapper();
addService(historyServerWrapper);
@@ -80,7 +84,7 @@ public class MiniMRYarnCluster extends M
Service.class);
// Non-standard shuffle port
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 8083);
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Thu Oct 27 12:44:57 2011
@@ -102,7 +102,7 @@ public class TestMRJobs {
}
if (mrCluster == null) {
- mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName());
+ mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3);
Configuration conf = new Configuration();
mrCluster.init(conf);
mrCluster.start();
@@ -322,7 +322,7 @@ public class TestMRJobs {
return job;
}
-//@Test
+ //@Test
public void testSleepJobWithSecurityOn() throws IOException,
InterruptedException, ClassNotFoundException {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Thu Oct 27 12:44:57 2011
@@ -249,9 +249,14 @@ public class ShuffleHandler extends Abst
public synchronized void start() {
Configuration conf = getConfig();
ServerBootstrap bootstrap = new ServerBootstrap(selector);
- bootstrap.setPipelineFactory(new HttpPipelineFactory(conf));
+ HttpPipelineFactory pipelineFact = new HttpPipelineFactory(conf);
+ bootstrap.setPipelineFactory(pipelineFact);
port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
- accepted.add(bootstrap.bind(new InetSocketAddress(port)));
+ Channel ch = bootstrap.bind(new InetSocketAddress(port));
+ accepted.add(ch);
+ port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+ conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
+ pipelineFact.SHUFFLE.setPort(port);
LOG.info(getName() + " listening on port " + port);
super.start();
}
@@ -304,13 +309,17 @@ public class ShuffleHandler extends Abst
private final IndexCache indexCache;
private final LocalDirAllocator lDirAlloc =
new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
- private final int port;
+ private int port;
public Shuffle(Configuration conf) {
this.conf = conf;
indexCache = new IndexCache(new JobConf(conf));
this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
}
+
+ public void setPort(int port) {
+ this.port = port;
+ }
private List<String> splitMaps(List<String> mapq) {
if (null == mapq) {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Thu Oct 27 12:44:57 2011
@@ -89,8 +89,9 @@ public class DefaultContainerExecutor ex
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn);
lfs.util().copy(nmPrivateContainerTokensPath, tokenDst);
+ LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
lfs.setWorkingDirectory(appStorageDir);
-
+ LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
// TODO: DO it over RPC for maintaining similarity?
localizer.runLocalization(nmAddr);
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Thu Oct 27 12:44:57 2011
@@ -235,8 +235,15 @@ public class ResourceLocalizationService
cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
server = createServer();
- LOG.info("Localizer started on port " + server.getPort());
server.start();
+ String host = getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS)
+ .split(":")[0];
+ getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, host + ":"
+ + server.getPort());
+ localizationServerAddress = NetUtils.createSocketAddr(
+ getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
+ LOG.info("Localizer started on port " + server.getPort());
super.start();
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Thu Oct 27 12:44:57 2011
@@ -147,7 +147,7 @@ public class TestResourceLocalizationSer
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testResourceRelease() throws Exception {
- Configuration conf = new Configuration();
+ Configuration conf = new YarnConfiguration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
@@ -331,7 +331,7 @@ public class TestResourceLocalizationSer
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testLocalizationHeartbeat() throws Exception {
- Configuration conf = new Configuration();
+ Configuration conf = new YarnConfiguration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Thu Oct 27 12:44:57 2011
@@ -292,6 +292,7 @@ public class ApplicationMasterService ex
public void registerAppAttempt(ApplicationAttemptId attemptId) {
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
response.setResponseId(0);
+ LOG.info("Registering " + attemptId);
responseMap.put(attemptId, response);
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Thu Oct 27 12:44:57 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service.STATE;
public class MiniYARNCluster extends CompositeService {
@@ -60,15 +62,19 @@ public class MiniYARNCluster extends Com
DefaultMetricsSystem.setMiniClusterMode(true);
}
- private NodeManager nodeManager;
+ private NodeManager[] nodeManagers;
private ResourceManager resourceManager;
private ResourceManagerWrapper resourceManagerWrapper;
- private NodeManagerWrapper nodeManagerWrapper;
private File testWorkDir;
public MiniYARNCluster(String testName) {
+ //default number of nodeManagers = 1
+ this(testName, 1);
+ }
+
+ public MiniYARNCluster(String testName, int noOfNodeManagers) {
super(testName);
this.testWorkDir = new File("target", testName);
try {
@@ -80,8 +86,11 @@ public class MiniYARNCluster extends Com
}
resourceManagerWrapper = new ResourceManagerWrapper();
addService(resourceManagerWrapper);
- nodeManagerWrapper = new NodeManagerWrapper();
- addService(nodeManagerWrapper);
+ nodeManagers = new CustomNodeManager[noOfNodeManagers];
+ for(int index = 0; index < noOfNodeManagers; index++) {
+ addService(new NodeManagerWrapper(index));
+ nodeManagers[index] = new CustomNodeManager();
+ }
}
public File getTestWorkDir() {
@@ -92,10 +101,10 @@ public class MiniYARNCluster extends Com
return this.resourceManager;
}
- public NodeManager getNodeManager() {
- return this.nodeManager;
+ public NodeManager getNodeManager(int i) {
+ return this.nodeManagers[i];
}
-
+
private class ResourceManagerWrapper extends AbstractService {
public ResourceManagerWrapper() {
super(ResourceManagerWrapper.class.getName());
@@ -145,106 +154,60 @@ public class MiniYARNCluster extends Com
}
private class NodeManagerWrapper extends AbstractService {
- public NodeManagerWrapper() {
- super(NodeManagerWrapper.class.getName());
+ int index = 0;
+
+ public NodeManagerWrapper(int i) {
+ super(NodeManagerWrapper.class.getName() + "_" + i);
+ index = i;
}
+ public synchronized void init(Configuration conf) {
+ Configuration config = new Configuration(conf);
+ super.init(config);
+ }
+
public synchronized void start() {
try {
- File localDir =
- new File(testWorkDir, MiniYARNCluster.this.getName() + "-localDir");
+ File localDir = new File(testWorkDir, MiniYARNCluster.this.getName()
+ + "-localDir-nm-" + index);
localDir.mkdir();
LOG.info("Created localDir in " + localDir.getAbsolutePath());
- getConfig().set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
+ getConfig().set(YarnConfiguration.NM_LOCAL_DIRS,
+ localDir.getAbsolutePath());
File logDir =
new File(testWorkDir, MiniYARNCluster.this.getName()
- + "-logDir");
+ + "-logDir-nm-" + index);
File remoteLogDir =
- new File(testWorkDir, MiniYARNCluster.this.getName()
- + "-remoteLogDir");
+ new File(testWorkDir, MiniYARNCluster.this.getName()
+ + "-remoteLogDir-nm-" + index);
logDir.mkdir();
remoteLogDir.mkdir();
LOG.info("Created logDir in " + logDir.getAbsolutePath());
- getConfig().set(YarnConfiguration.NM_LOG_DIRS, logDir.getAbsolutePath());
+ getConfig().set(YarnConfiguration.NM_LOG_DIRS,
+ logDir.getAbsolutePath());
getConfig().set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
- remoteLogDir.getAbsolutePath());
- getConfig().setInt(YarnConfiguration.NM_PMEM_MB, 4*1024); // By default AM + 2 containers
- nodeManager = new NodeManager() {
-
- @Override
- protected void doSecureLogin() throws IOException {
- // Don't try to login using keytab in the testcase.
- };
-
- @Override
- protected NodeStatusUpdater createNodeStatusUpdater(Context context,
- Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
- ContainerTokenSecretManager containerTokenSecretManager) {
- return new NodeStatusUpdaterImpl(context, dispatcher,
- healthChecker, metrics, containerTokenSecretManager) {
- @Override
- protected ResourceTracker getRMClient() {
- final ResourceTrackerService rt = resourceManager
- .getResourceTrackerService();
- final RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
-
- // For in-process communication without RPC
- return new ResourceTracker() {
-
- @Override
- public NodeHeartbeatResponse nodeHeartbeat(
- NodeHeartbeatRequest request) throws YarnRemoteException {
- NodeHeartbeatResponse response = recordFactory.newRecordInstance(
- NodeHeartbeatResponse.class);
- try {
- response.setHeartbeatResponse(rt.nodeHeartbeat(request)
- .getHeartbeatResponse());
- } catch (IOException ioe) {
- LOG.info("Exception in heartbeat from node " +
- request.getNodeStatus().getNodeId(), ioe);
- throw RPCUtil.getRemoteException(ioe);
- }
- return response;
- }
-
- @Override
- public RegisterNodeManagerResponse registerNodeManager(
- RegisterNodeManagerRequest request)
- throws YarnRemoteException {
- RegisterNodeManagerResponse response = recordFactory.newRecordInstance(
- RegisterNodeManagerResponse.class);
- try {
- response.setRegistrationResponse(rt
- .registerNodeManager(request)
- .getRegistrationResponse());
- } catch (IOException ioe) {
- LOG.info("Exception in node registration from "
- + request.getNodeId().toString(), ioe);
- throw RPCUtil.getRemoteException(ioe);
- }
- return response;
- }
- };
- };
- };
- };
- };
- nodeManager.init(getConfig());
+ remoteLogDir.getAbsolutePath());
+ // By default AM + 2 containers
+ getConfig().setInt(YarnConfiguration.NM_PMEM_MB, 4*1024);
+ getConfig().set(YarnConfiguration.NM_ADDRESS, "0.0.0.0:0");
+ getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0");
+ getConfig().set(YarnConfiguration.NM_WEBAPP_ADDRESS, "0.0.0.0:0");
+ LOG.info("Starting NM: " + index);
+ nodeManagers[index].init(getConfig());
new Thread() {
public void run() {
- nodeManager.start();
+ nodeManagers[index].start();
};
}.start();
int waitCount = 0;
- while (nodeManager.getServiceState() == STATE.INITED
+ while (nodeManagers[index].getServiceState() == STATE.INITED
&& waitCount++ < 60) {
- LOG.info("Waiting for NM to start...");
+ LOG.info("Waiting for NM " + index + " to start...");
Thread.sleep(1000);
}
- if (nodeManager.getServiceState() != STATE.STARTED) {
+ if (nodeManagers[index].getServiceState() != STATE.STARTED) {
// RM could have failed.
- throw new IOException("NodeManager failed to start");
+ throw new IOException("NodeManager " + index + " failed to start");
}
super.start();
} catch (Throwable t) {
@@ -254,10 +217,71 @@ public class MiniYARNCluster extends Com
@Override
public synchronized void stop() {
- if (nodeManager != null) {
- nodeManager.stop();
+ if (nodeManagers[index] != null) {
+ nodeManagers[index].stop();
}
super.stop();
}
}
+
+ private class CustomNodeManager extends NodeManager {
+ @Override
+ protected void doSecureLogin() throws IOException {
+ // Don't try to login using keytab in the testcase.
+ };
+
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+ ContainerTokenSecretManager containerTokenSecretManager) {
+ return new NodeStatusUpdaterImpl(context, dispatcher,
+ healthChecker, metrics, containerTokenSecretManager) {
+ @Override
+ protected ResourceTracker getRMClient() {
+ final ResourceTrackerService rt = resourceManager
+ .getResourceTrackerService();
+ final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ // For in-process communication without RPC
+ return new ResourceTracker() {
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(
+ NodeHeartbeatRequest request) throws YarnRemoteException {
+ NodeHeartbeatResponse response = recordFactory.newRecordInstance(
+ NodeHeartbeatResponse.class);
+ try {
+ response.setHeartbeatResponse(rt.nodeHeartbeat(request)
+ .getHeartbeatResponse());
+ } catch (IOException ioe) {
+ LOG.info("Exception in heartbeat from node " +
+ request.getNodeStatus().getNodeId(), ioe);
+ throw RPCUtil.getRemoteException(ioe);
+ }
+ return response;
+ }
+
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request)
+ throws YarnRemoteException {
+ RegisterNodeManagerResponse response = recordFactory.
+ newRecordInstance(RegisterNodeManagerResponse.class);
+ try {
+ response.setRegistrationResponse(rt
+ .registerNodeManager(request)
+ .getRegistrationResponse());
+ } catch (IOException ioe) {
+ LOG.info("Exception in node registration from "
+ + request.getNodeId().toString(), ioe);
+ throw RPCUtil.getRemoteException(ioe);
+ }
+ return response;
+ }
+ };
+ };
+ };
+ };
+ }
}