You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2011/09/29 02:10:04 UTC
svn commit: r1177117 [2/7] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apa...
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1177117&r1=1177116&r2=1177117&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu Sep 29 00:09:56 2011
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.v2.a
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -62,7 +61,6 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
@@ -103,6 +101,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken;
@@ -117,7 +116,6 @@ import org.apache.hadoop.yarn.state.Inva
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.RackResolver;
@@ -153,7 +151,7 @@ public abstract class TaskAttemptImpl im
private Token<JobTokenIdentifier> jobToken;
private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
private static String initialClasspath = null;
- private final Object classpathLock = new Object();
+ private static final Object classpathLock = new Object();
private long launchTime;
private long finishTime;
private WrappedProgressSplitsBlock progressSplitBlock;
@@ -518,8 +516,8 @@ public abstract class TaskAttemptImpl im
return initialClasspath;
}
Map<String, String> env = new HashMap<String, String>();
- MRApps.setInitialClasspath(env);
- initialClasspath = env.get(MRApps.CLASSPATH);
+ MRApps.setClasspath(env);
+ initialClasspath = env.get(Environment.CLASSPATH.name());
initialClasspathFlag.set(true);
return initialClasspath;
}
@@ -531,16 +529,18 @@ public abstract class TaskAttemptImpl im
*/
private ContainerLaunchContext createContainerLaunchContext() {
- ContainerLaunchContext container =
- recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
// Application resources
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
// Application environment
Map<String, String> environment = new HashMap<String, String>();
-
+
+ // Service data
+ Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+
+ // Tokens
+ ByteBuffer tokens = ByteBuffer.wrap(new byte[]{});
try {
FileSystem remoteFS = FileSystem.get(conf);
@@ -550,7 +550,7 @@ public abstract class TaskAttemptImpl im
MRJobConfig.JAR))).makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
localResources.put(
- MRConstants.JOB_JAR,
+ MRJobConfig.JOB_JAR,
createLocalResource(remoteFS, recordFactory, remoteJobJar,
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
LOG.info("The job-jar file on the remote FS is "
@@ -570,9 +570,9 @@ public abstract class TaskAttemptImpl im
Path remoteJobSubmitDir =
new Path(path, oldJobId.toString());
Path remoteJobConfPath =
- new Path(remoteJobSubmitDir, MRConstants.JOB_CONF_FILE);
+ new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
localResources.put(
- MRConstants.JOB_CONF_FILE,
+ MRJobConfig.JOB_CONF_FILE,
createLocalResource(remoteFS, recordFactory, remoteJobConfPath,
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
LOG.info("The job-conf file on the remote FS is "
@@ -580,12 +580,8 @@ public abstract class TaskAttemptImpl im
// //////////// End of JobConf setup
// Setup DistributedCache
- MRApps.setupDistributedCache(conf, localResources, environment);
+ MRApps.setupDistributedCache(conf, localResources);
- // Set local-resources and environment
- container.setLocalResources(localResources);
- container.setEnvironment(environment);
-
// Setup up tokens
Credentials taskCredentials = new Credentials();
@@ -606,52 +602,43 @@ public abstract class TaskAttemptImpl im
LOG.info("Size of containertokens_dob is "
+ taskCredentials.numberOfTokens());
taskCredentials.writeTokenStorageToStream(containerTokens_dob);
- container.setContainerTokens(
+ tokens =
ByteBuffer.wrap(containerTokens_dob.getData(), 0,
- containerTokens_dob.getLength()));
+ containerTokens_dob.getLength());
// Add shuffle token
LOG.info("Putting shuffle token in serviceData");
- Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
ShuffleHandler.serializeServiceData(jobToken));
- container.setServiceData(serviceData);
- MRApps.addToClassPath(container.getEnvironment(), getInitialClasspath());
+ MRApps.addToEnvironment(
+ environment,
+ Environment.CLASSPATH.name(),
+ getInitialClasspath());
} catch (IOException e) {
throw new YarnException(e);
}
-
- container.setContainerId(containerID);
- container.setUser(conf.get(MRJobConfig.USER_NAME)); // TODO: Fix
-
- File workDir = new File("$PWD"); // Will be expanded by the shell.
- String containerLogDir =
- new File(ApplicationConstants.LOG_DIR_EXPANSION_VAR).toString();
- String childTmpDir = new File(workDir, "tmp").toString();
- String javaHome = "${JAVA_HOME}"; // Will be expanded by the shell.
- String nmLdLibraryPath = "{LD_LIBRARY_PATH}"; // Expanded by the shell?
- List<String> classPaths = new ArrayList<String>();
-
- String localizedApplicationTokensFile =
- new File(workDir, MRConstants.APPLICATION_TOKENS_FILE).toString();
- classPaths.add(MRConstants.JOB_JAR);
- classPaths.add(MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
- classPaths.add(workDir.toString()); // TODO
- // Construct the actual Container
- container.setCommands(MapReduceChildJVM.getVMCommand(
- taskAttemptListener.getAddress(), remoteTask, javaHome,
- workDir.toString(), containerLogDir, childTmpDir, jvmID));
-
- MapReduceChildJVM.setVMEnv(container.getEnvironment(), classPaths,
- workDir.toString(), containerLogDir, nmLdLibraryPath, remoteTask,
- localizedApplicationTokensFile);
+ // Setup environment
+ MapReduceChildJVM.setVMEnv(environment, remoteTask);
+ // Set up the launch command
+ List<String> commands = MapReduceChildJVM.getVMCommand(
+ taskAttemptListener.getAddress(), remoteTask,
+ jvmID);
+
// Construct the actual Container
+ ContainerLaunchContext container =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
container.setContainerId(containerID);
container.setUser(conf.get(MRJobConfig.USER_NAME));
container.setResource(assignedCapability);
+ container.setLocalResources(localResources);
+ container.setEnvironment(environment);
+ container.setCommands(commands);
+ container.setServiceData(serviceData);
+ container.setContainerTokens(tokens);
+
return container;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1177117&r1=1177116&r2=1177117&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Thu Sep 29 00:09:56 2011
@@ -73,6 +73,8 @@ public class ContainerLauncherImpl exten
private AppContext context;
private ThreadPoolExecutor launcherPool;
+ private static final int INITIAL_POOL_SIZE = 10;
+ private int limitOnPoolSize;
private Thread eventHandlingThread;
private BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>();
@@ -96,16 +98,17 @@ public class ContainerLauncherImpl exten
YarnConfiguration.YARN_SECURITY_INFO,
ContainerManagerSecurityInfo.class, SecurityInfo.class);
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
+ this.limitOnPoolSize = conf.getInt(
+ MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
+ MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
super.init(myLocalConfig);
}
public void start() {
- launcherPool =
- new ThreadPoolExecutor(getConfig().getInt(
- MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT, 10),
- Integer.MAX_VALUE, 1, TimeUnit.HOURS,
- new LinkedBlockingQueue<Runnable>());
- launcherPool.prestartAllCoreThreads(); // Wait for work.
+ // Start with a default core-pool size of 10 and change it dynamically.
+ launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE,
+ Integer.MAX_VALUE, 1, TimeUnit.HOURS,
+ new LinkedBlockingQueue<Runnable>());
eventHandlingThread = new Thread(new Runnable() {
@Override
public void run() {
@@ -117,6 +120,26 @@ public class ContainerLauncherImpl exten
LOG.error("Returning, interrupted : " + e);
return;
}
+
+ int poolSize = launcherPool.getCorePoolSize();
+
+ // See if we need up the pool size only if haven't reached the
+ // maximum limit yet.
+ if (poolSize != limitOnPoolSize) {
+
+ // nodes where containers will run at *this* point of time. This is
+ // *not* the cluster size and doesn't need to be.
+ int numNodes = ugiMap.size();
+ int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
+
+ if (poolSize <= idealPoolSize) {
+ // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
+ // later is just a buffer so we are not always increasing the
+ // pool-size
+ launcherPool.setCorePoolSize(idealPoolSize + INITIAL_POOL_SIZE);
+ }
+ }
+
// the events from the queue are handled in parallel
// using a thread pool
launcherPool.execute(new EventProcessor(event));
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1177117&r1=1177116&r2=1177117&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Thu Sep 29 00:09:56 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app.local;
+import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -30,15 +31,19 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
-import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
/**
@@ -66,6 +71,20 @@ public class LocalContainerAllocator ext
}
@Override
+ protected synchronized void heartbeat() throws Exception {
+ AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
+ this.applicationAttemptId, this.lastResponseID, super
+ .getApplicationProgress(), new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>());
+ AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
+ AMResponse response = allocateResponse.getAMResponse();
+ if (response.getReboot()) {
+ // TODO
+ LOG.info("Event from RM: shutting down Application Master");
+ }
+ }
+
+ @Override
public void handle(ContainerAllocatorEvent event) {
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
LOG.info("Processing the event " + event.toString());
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1177117&r1=1177116&r2=1177117&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Thu Sep 29 00:09:56 2011
@@ -58,7 +58,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -92,10 +92,9 @@ public class RecoveryService extends Com
private static final Log LOG = LogFactory.getLog(RecoveryService.class);
- private final ApplicationId appID;
+ private final ApplicationAttemptId applicationAttemptId;
private final Dispatcher dispatcher;
private final ControlledClock clock;
- private final int startCount;
private JobInfo jobInfo = null;
private final Map<TaskId, TaskInfo> completedTasks =
@@ -106,10 +105,10 @@ public class RecoveryService extends Com
private volatile boolean recoveryMode = false;
- public RecoveryService(ApplicationId appID, Clock clock, int startCount) {
+ public RecoveryService(ApplicationAttemptId applicationAttemptId,
+ Clock clock) {
super("RecoveringDispatcher");
- this.appID = appID;
- this.startCount = startCount;
+ this.applicationAttemptId = applicationAttemptId;
this.dispatcher = new RecoveryDispatcher();
this.clock = new ControlledClock(clock);
addService((Service) dispatcher);
@@ -152,7 +151,8 @@ public class RecoveryService extends Com
private void parse() throws IOException {
// TODO: parse history file based on startCount
- String jobName = TypeConverter.fromYarn(appID).toString();
+ String jobName =
+ TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
FSDataInputStream in = null;
Path historyFile = null;
@@ -160,8 +160,9 @@ public class RecoveryService extends Com
new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
getConfig());
+ //read the previous history file
historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
- histDirPath, jobName, startCount - 1)); //read the previous history file
+ histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));
in = fc.open(historyFile);
JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse();
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1177117&r1=1177116&r2=1177117&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Thu Sep 29 00:09:56 2011
@@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.v2.a
import java.io.IOException;
import java.security.PrivilegedAction;
-import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,6 +28,7 @@ import org.apache.hadoop.mapreduce.JobID
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -42,17 +42,12 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -64,7 +59,7 @@ import org.apache.hadoop.yarn.service.Ab
/**
* Registers/unregisters to RM and sends heartbeats to RM.
*/
-public class RMCommunicator extends AbstractService {
+public abstract class RMCommunicator extends AbstractService {
private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
private int rmPollInterval;//millis
protected ApplicationId applicationId;
@@ -74,7 +69,7 @@ public class RMCommunicator extends Abst
protected EventHandler eventHandler;
protected AMRMProtocol scheduler;
private final ClientService clientService;
- private int lastResponseID;
+ protected int lastResponseID;
private Resource minContainerCapability;
private Resource maxContainerCapability;
@@ -121,6 +116,34 @@ public class RMCommunicator extends Abst
return job;
}
+ /**
+ * Get the appProgress. Can be used only after this component is started.
+ * @return the appProgress.
+ */
+ protected float getApplicationProgress() {
+ // For now just a single job. In future when we have a DAG, we need an
+ // aggregate progress.
+ JobReport report = this.job.getReport();
+ float setupWeight = 0.05f;
+ float cleanupWeight = 0.05f;
+ float mapWeight = 0.0f;
+ float reduceWeight = 0.0f;
+ int numMaps = this.job.getTotalMaps();
+ int numReduces = this.job.getTotalReduces();
+ if (numMaps == 0 && numReduces == 0) {
+ } else if (numMaps == 0) {
+ reduceWeight = 0.9f;
+ } else if (numReduces == 0) {
+ mapWeight = 0.9f;
+ } else {
+ mapWeight = reduceWeight = 0.45f;
+ }
+ return (report.getSetupProgress() * setupWeight
+ + report.getCleanupProgress() * cleanupWeight
+ + report.getMapProgress() * mapWeight + report.getReduceProgress()
+ * reduceWeight);
+ }
+
protected void register() {
//Register
String host =
@@ -262,18 +285,5 @@ public class RMCommunicator extends Abst
});
}
- protected synchronized void heartbeat() throws Exception {
- AllocateRequest allocateRequest =
- recordFactory.newRecordInstance(AllocateRequest.class);
- allocateRequest.setApplicationAttemptId(applicationAttemptId);
- allocateRequest.setResponseId(lastResponseID);
- allocateRequest.addAllAsks(new ArrayList<ResourceRequest>());
- allocateRequest.addAllReleases(new ArrayList<ContainerId>());
- AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
- AMResponse response = allocateResponse.getAMResponse();
- if (response.getReboot()) {
- LOG.info("Event from RM: shutting down Application Master");
- }
- }
-
+ protected abstract void heartbeat() throws Exception;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1177117&r1=1177116&r2=1177117&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Sep 29 00:09:56 2011
@@ -586,37 +586,21 @@ public class RMContainerAllocator extend
private ContainerRequest assign(Container allocated) {
ContainerRequest assigned = null;
- if (mapResourceReqt != reduceResourceReqt) {
- //assign based on size
- LOG.info("Assigning based on container size");
- if (allocated.getResource().getMemory() == mapResourceReqt) {
- assigned = assignToFailedMap(allocated);
- if (assigned == null) {
- assigned = assignToMap(allocated);
- }
- } else if (allocated.getResource().getMemory() == reduceResourceReqt) {
- assigned = assignToReduce(allocated);
- }
-
- return assigned;
- }
-
- //container can be given to either map or reduce
- //assign based on priority
-
- //try to assign to earlierFailedMaps if present
- assigned = assignToFailedMap(allocated);
-
- //Assign to reduces before assigning to maps ?
- if (assigned == null) {
+ Priority priority = allocated.getPriority();
+ if (PRIORITY_FAST_FAIL_MAP.equals(priority)) {
+ LOG.info("Assigning container " + allocated + " to fast fail map");
+ assigned = assignToFailedMap(allocated);
+ } else if (PRIORITY_REDUCE.equals(priority)) {
+ LOG.info("Assigning container " + allocated + " to reduce");
assigned = assignToReduce(allocated);
- }
-
- //try to assign to maps if present
- if (assigned == null) {
+ } else if (PRIORITY_MAP.equals(priority)) {
+ LOG.info("Assigning container " + allocated + " to map");
assigned = assignToMap(allocated);
+ } else {
+ LOG.warn("Container allocated at unwanted priority: " + priority +
+ ". Returning to RM...");
}
-
+
return assigned;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1177117&r1=1177116&r2=1177117&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Thu Sep 29 00:09:56 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
/**
* Keeps the data structures to send container requests to RM.
@@ -107,15 +108,11 @@ public abstract class RMContainerRequest
LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
}
- protected abstract void heartbeat() throws Exception;
-
protected AMResponse makeRemoteRequest() throws YarnRemoteException {
- AllocateRequest allocateRequest = recordFactory
- .newRecordInstance(AllocateRequest.class);
- allocateRequest.setApplicationAttemptId(applicationAttemptId);
- allocateRequest.setResponseId(lastResponseID);
- allocateRequest.addAllAsks(new ArrayList<ResourceRequest>(ask));
- allocateRequest.addAllReleases(new ArrayList<ContainerId>(release));
+ AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
+ applicationAttemptId, lastResponseID, super.getApplicationProgress(),
+ new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(
+ release));
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
AMResponse response = allocateResponse.getAMResponse();
lastResponseID = response.getResponseId();
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java?rev=1177117&r1=1177116&r2=1177117&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java Thu Sep 29 00:09:56 2011
@@ -35,7 +35,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -87,7 +86,7 @@ public class DefaultSpeculator extends A
private final ConcurrentMap<JobId, AtomicInteger> reduceContainerNeeds
= new ConcurrentHashMap<JobId, AtomicInteger>();
- private final Set<TaskId> mayHaveSpeculated = new HashSet();
+ private final Set<TaskId> mayHaveSpeculated = new HashSet<TaskId>();
private final Configuration conf;
private AppContext context;
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobConfPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobConfPage.java?rev=1177117&r1=1177116&r2=1177117&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobConfPage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobConfPage.java Thu Sep 29 00:09:56 2011
@@ -44,6 +44,7 @@ public class JobConfPage extends AppView
set(TITLE, jobID.isEmpty() ? "Bad request: missing job ID"
: join("Configuration for MapReduce Job ", $(JOB_ID)));
commonPreHead(html);
+ set(initID(ACCORDION, "nav"), "{autoHeight:false, active:2}");
set(DATATABLES_ID, "conf");
set(initID(DATATABLES, "conf"), confTableInit());
set(postInitID(DATATABLES, "conf"), confPostTableInit());
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java?rev=1177117&r1=1177116&r2=1177117&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java Thu Sep 29 00:09:56 2011
@@ -38,9 +38,9 @@ public class NavBlock extends HtmlBlock
div("#nav").
h3("Cluster").
ul().
- li().a(url(rmweb, prefix(), "cluster"), "About")._().
- li().a(url(rmweb, prefix(), "apps"), "Applications")._().
- li().a(url(rmweb, prefix(), "scheduler"), "Scheduler")._()._().
+ li().a(url(rmweb, "cluster", "cluster"), "About")._().
+ li().a(url(rmweb, "cluster", "apps"), "Applications")._().
+ li().a(url(rmweb, "cluster", "scheduler"), "Scheduler")._()._().
h3("Application").
ul().
li().a(url("app/info"), "About")._().
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java?rev=1177117&r1=1177116&r2=1177117&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java Thu Sep 29 00:09:56 2011
@@ -85,7 +85,7 @@ public class TaskPage extends AppView {
if (containerId != null) {
String containerIdStr = ConverterUtils.toString(containerId);
nodeTd._(" ").
- a(".logslink", url("http://", nodeHttpAddr, "yarn", "containerlogs",
+ a(".logslink", url("http://", nodeHttpAddr, "node", "containerlogs",
containerIdStr), "logs");
}
nodeTd._().
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1177117&r1=1177116&r2=1177117&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Thu Sep 29 00:09:56 2011
@@ -66,6 +66,7 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -91,7 +92,7 @@ public class MRApp extends MRAppMaster {
private File testWorkDir;
private Path testAbsPath;
- private final RecordFactory recordFactory =
+ private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
//if true, tasks complete automatically as soon as they are launched
@@ -100,7 +101,7 @@ public class MRApp extends MRAppMaster {
static ApplicationId applicationId;
static {
- applicationId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
+ applicationId = recordFactory.newRecordInstance(ApplicationId.class);
applicationId.setClusterTimestamp(0);
applicationId.setId(0);
}
@@ -108,9 +109,19 @@ public class MRApp extends MRAppMaster {
public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
}
+
+ private static ApplicationAttemptId getApplicationAttemptId(
+ ApplicationId applicationId, int startCount) {
+ ApplicationAttemptId applicationAttemptId =
+ recordFactory.newRecordInstance(ApplicationAttemptId.class);
+ applicationAttemptId.setApplicationId(applicationId);
+ applicationAttemptId.setAttemptId(startCount);
+ return applicationAttemptId;
+ }
- public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) {
- super(applicationId, startCount);
+ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+ boolean cleanOnStart, int startCount) {
+ super(getApplicationAttemptId(applicationId, startCount));
this.testWorkDir = new File("target", testName);
testAbsPath = new Path(testWorkDir.getAbsolutePath());
LOG.info("PathUsed: " + testAbsPath);
@@ -391,11 +402,12 @@ public class MRApp extends MRAppMaster {
return localStateMachine;
}
- public TestJob(Configuration conf, ApplicationId appID,
+ public TestJob(Configuration conf, ApplicationId applicationId,
EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
Clock clock, String user) {
- super(appID, conf, eventHandler, taskAttemptListener,
- new JobTokenSecretManager(), new Credentials(), clock, getStartCount(),
+ super(getApplicationAttemptId(applicationId, getStartCount()),
+ conf, eventHandler, taskAttemptListener,
+ new JobTokenSecretManager(), new Credentials(), clock,
getCompletedTaskFromPreviousRun(), metrics, user);
// This "this leak" is okay because the retained pointer is in an
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1177117&r1=1177116&r2=1177117&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Thu Sep 29 00:09:56 2011
@@ -18,12 +18,15 @@
package org.apache.hadoop.mapreduce.v2.app;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import junit.framework.Assert;
@@ -32,475 +35,651 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationMaster;
-import org.apache.hadoop.yarn.api.records.ApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
-import org.junit.BeforeClass;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.After;
import org.junit.Test;
public class TestRMContainerAllocator {
-// private static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class);
-// private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-//
-// @BeforeClass
-// public static void preTests() {
-// DefaultMetricsSystem.shutdown();
-// }
-//
-// @Test
-// public void testSimple() throws Exception {
-// FifoScheduler scheduler = createScheduler();
-// LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
-// scheduler, new Configuration());
-//
-// //add resources to scheduler
-// RMNode nodeManager1 = addNode(scheduler, "h1", 10240);
-// RMNode nodeManager2 = addNode(scheduler, "h2", 10240);
-// RMNode nodeManager3 = addNode(scheduler, "h3", 10240);
-//
-// //create the container request
-// ContainerRequestEvent event1 =
-// createReq(1, 1024, new String[]{"h1"});
-// allocator.sendRequest(event1);
-//
-// //send 1 more request with different resource req
-// ContainerRequestEvent event2 = createReq(2, 1024, new String[]{"h2"});
-// allocator.sendRequest(event2);
-//
-// //this tells the scheduler about the requests
-// //as nodes are not added, no allocations
-// List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-// Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-//
-// //send another request with different resource and priority
-// ContainerRequestEvent event3 = createReq(3, 1024, new String[]{"h3"});
-// allocator.sendRequest(event3);
-//
-// //this tells the scheduler about the requests
-// //as nodes are not added, no allocations
-// assigned = allocator.schedule();
-// Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-//
-// //update resources in scheduler
-// scheduler.nodeUpdate(nodeManager1); // Node heartbeat
-// scheduler.nodeUpdate(nodeManager2); // Node heartbeat
-// scheduler.nodeUpdate(nodeManager3); // Node heartbeat
-//
-//
-// assigned = allocator.schedule();
-// checkAssignments(
-// new ContainerRequestEvent[]{event1, event2, event3}, assigned, false);
-// }
-//
-// //TODO: Currently Scheduler seems to have bug where it does not work
-// //for Application asking for containers with different capabilities.
-// //@Test
-// public void testResource() throws Exception {
-// FifoScheduler scheduler = createScheduler();
-// LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
-// scheduler, new Configuration());
-//
-// //add resources to scheduler
-// RMNode nodeManager1 = addNode(scheduler, "h1", 10240);
-// RMNode nodeManager2 = addNode(scheduler, "h2", 10240);
-// RMNode nodeManager3 = addNode(scheduler, "h3", 10240);
-//
-// //create the container request
-// ContainerRequestEvent event1 =
-// createReq(1, 1024, new String[]{"h1"});
-// allocator.sendRequest(event1);
-//
-// //send 1 more request with different resource req
-// ContainerRequestEvent event2 = createReq(2, 2048, new String[]{"h2"});
-// allocator.sendRequest(event2);
-//
-// //this tells the scheduler about the requests
-// //as nodes are not added, no allocations
-// List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-// Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-//
-// //update resources in scheduler
-// scheduler.nodeUpdate(nodeManager1); // Node heartbeat
-// scheduler.nodeUpdate(nodeManager2); // Node heartbeat
-// scheduler.nodeUpdate(nodeManager3); // Node heartbeat
-//
-// assigned = allocator.schedule();
-// checkAssignments(
-// new ContainerRequestEvent[]{event1, event2}, assigned, false);
-// }
-//
-// @Test
-// public void testMapReduceScheduling() throws Exception {
-// FifoScheduler scheduler = createScheduler();
-// Configuration conf = new Configuration();
-// LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
-// scheduler, conf);
-//
-// //add resources to scheduler
-// RMNode nodeManager1 = addNode(scheduler, "h1", 1024);
-// RMNode nodeManager2 = addNode(scheduler, "h2", 10240);
-// RMNode nodeManager3 = addNode(scheduler, "h3", 10240);
-//
-// //create the container request
-// //send MAP request
-// ContainerRequestEvent event1 =
-// createReq(1, 2048, new String[]{"h1", "h2"}, true, false);
-// allocator.sendRequest(event1);
-//
-// //send REDUCE request
-// ContainerRequestEvent event2 = createReq(2, 3000, new String[]{"h1"}, false, true);
-// allocator.sendRequest(event2);
-//
-// //send MAP request
-// ContainerRequestEvent event3 = createReq(3, 2048, new String[]{"h3"}, false, false);
-// allocator.sendRequest(event3);
-//
-// //this tells the scheduler about the requests
-// //as nodes are not added, no allocations
-// List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-// Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-//
-// //update resources in scheduler
-// scheduler.nodeUpdate(nodeManager1); // Node heartbeat
-// scheduler.nodeUpdate(nodeManager2); // Node heartbeat
-// scheduler.nodeUpdate(nodeManager3); // Node heartbeat
-//
-// assigned = allocator.schedule();
-// checkAssignments(
-// new ContainerRequestEvent[]{event1, event3}, assigned, false);
-//
-// //validate that no container is assigned to h1 as it doesn't have 2048
-// for (TaskAttemptContainerAssignedEvent assig : assigned) {
-// Assert.assertFalse("Assigned count not correct",
-// "h1".equals(assig.getContainer().getNodeId().getHost()));
-// }
-// }
-//
-//
-//
-// private RMNode addNode(FifoScheduler scheduler,
-// String nodeName, int memory) {
-// NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
-// nodeId.setHost(nodeName);
-// nodeId.setPort(1234);
-// Resource resource = recordFactory.newRecordInstance(Resource.class);
-// resource.setMemory(memory);
-// RMNode nodeManager = new RMNodeImpl(nodeId, null, nodeName, 0, 0,
-// ResourceTrackerService.resolve(nodeName), resource);
-// scheduler.addNode(nodeManager); // Node registration
-// return nodeManager;
-// }
-//
-// private FifoScheduler createScheduler() throws YarnRemoteException {
-// FifoScheduler fsc = new FifoScheduler() {
-// //override this to copy the objects
-// //otherwise FifoScheduler updates the numContainers in same objects as kept by
-// //RMContainerAllocator
-//
-// @Override
-// public synchronized void allocate(ApplicationAttemptId applicationId,
-// List<ResourceRequest> ask) {
-// List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
-// for (ResourceRequest req : ask) {
-// ResourceRequest reqCopy = recordFactory.newRecordInstance(ResourceRequest.class);
-// reqCopy.setPriority(req.getPriority());
-// reqCopy.setHostName(req.getHostName());
-// reqCopy.setCapability(req.getCapability());
-// reqCopy.setNumContainers(req.getNumContainers());
-// askCopy.add(reqCopy);
-// }
-// super.allocate(applicationId, askCopy);
-// }
-// };
-// try {
-// fsc.reinitialize(new Configuration(), new ContainerTokenSecretManager(), null);
-// fsc.addApplication(recordFactory.newRecordInstance(ApplicationId.class),
-// recordFactory.newRecordInstance(ApplicationMaster.class),
-// "test", null, null, StoreFactory.createVoidAppStore());
-// } catch(IOException ie) {
-// LOG.info("add application failed with ", ie);
-// assert(false);
-// }
-// return fsc;
-// }
-//
-// private ContainerRequestEvent createReq(
-// int attemptid, int memory, String[] hosts) {
-// return createReq(attemptid, memory, hosts, false, false);
-// }
-//
-// private ContainerRequestEvent createReq(
-// int attemptid, int memory, String[] hosts, boolean earlierFailedAttempt, boolean reduce) {
-// ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
-// appId.setClusterTimestamp(0);
-// appId.setId(0);
-// JobId jobId = recordFactory.newRecordInstance(JobId.class);
-// jobId.setAppId(appId);
-// jobId.setId(0);
-// TaskId taskId = recordFactory.newRecordInstance(TaskId.class);
-// taskId.setId(0);
-// taskId.setJobId(jobId);
-// if (reduce) {
-// taskId.setTaskType(TaskType.REDUCE);
-// } else {
-// taskId.setTaskType(TaskType.MAP);
-// }
-// TaskAttemptId attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
-// attemptId.setId(attemptid);
-// attemptId.setTaskId(taskId);
-// Resource containerNeed = recordFactory.newRecordInstance(Resource.class);
-// containerNeed.setMemory(memory);
-// if (earlierFailedAttempt) {
-// return ContainerRequestEvent.
-// createContainerRequestEventForFailedContainer(attemptId, containerNeed);
-// }
-// return new ContainerRequestEvent(attemptId,
-// containerNeed,
-// hosts, new String[] {NetworkTopology.DEFAULT_RACK});
-// }
-//
-// private void checkAssignments(ContainerRequestEvent[] requests,
-// List<TaskAttemptContainerAssignedEvent> assignments,
-// boolean checkHostMatch) {
-// Assert.assertNotNull("Container not assigned", assignments);
-// Assert.assertEquals("Assigned count not correct",
-// requests.length, assignments.size());
-//
-// //check for uniqueness of containerIDs
-// Set<ContainerId> containerIds = new HashSet<ContainerId>();
-// for (TaskAttemptContainerAssignedEvent assigned : assignments) {
-// containerIds.add(assigned.getContainer().getId());
-// }
-// Assert.assertEquals("Assigned containers must be different",
-// assignments.size(), containerIds.size());
-//
-// //check for all assignment
-// for (ContainerRequestEvent req : requests) {
-// TaskAttemptContainerAssignedEvent assigned = null;
-// for (TaskAttemptContainerAssignedEvent ass : assignments) {
-// if (ass.getTaskAttemptID().equals(req.getAttemptID())){
-// assigned = ass;
-// break;
-// }
-// }
-// checkAssignment(req, assigned, checkHostMatch);
-// }
-// }
-//
-// private void checkAssignment(ContainerRequestEvent request,
-// TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) {
-// Assert.assertNotNull("Nothing assigned to attempt " + request.getAttemptID(),
-// assigned);
-// Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(),
-// assigned.getTaskAttemptID());
-// if (checkHostMatch) {
-// Assert.assertTrue("Not assigned to requested host", Arrays.asList(
-// request.getHosts()).contains(
-// assigned.getContainer().getNodeId().toString()));
-// }
-//
-// }
-//
-// //Mock RMContainerAllocator
-// //Instead of talking to remote Scheduler,uses the local Scheduler
-// public static class LocalRMContainerAllocator extends RMContainerAllocator {
-// private static final List<TaskAttemptContainerAssignedEvent> events =
-// new ArrayList<TaskAttemptContainerAssignedEvent>();
-//
-// public static class AMRMProtocolImpl implements AMRMProtocol {
-//
-// private ResourceScheduler resourceScheduler;
-//
-// public AMRMProtocolImpl(ResourceScheduler resourceScheduler) {
-// this.resourceScheduler = resourceScheduler;
-// }
-//
-// @Override
-// public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnRemoteException {
-// RegisterApplicationMasterResponse response = recordFactory.newRecordInstance(RegisterApplicationMasterResponse.class);
-// return response;
-// }
-//
-// public AllocateResponse allocate(AllocateRequest request) throws YarnRemoteException {
-// List<ResourceRequest> ask = request.getAskList();
-// List<Container> release = request.getReleaseList();
-// try {
-// AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
-// Allocation allocation = resourceScheduler.allocate(request.getApplicationAttemptId(), ask);
-// response.addAllNewContainers(allocation.getContainers());
-// response.setAvailableResources(allocation.getResourceLimit());
-// AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class);
-// allocateResponse.setAMResponse(response);
-// return allocateResponse;
-// } catch(IOException ie) {
-// throw RPCUtil.getRemoteException(ie);
-// }
-// }
-//
-// @Override
-// public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnRemoteException {
-// FinishApplicationMasterResponse response = recordFactory.newRecordInstance(FinishApplicationMasterResponse.class);
-// return response;
-// }
-//
-// }
-//
-// private ResourceScheduler scheduler;
-// LocalRMContainerAllocator(ResourceScheduler scheduler, Configuration conf) {
-// super(null, new TestContext(events));
-// this.scheduler = scheduler;
-// super.init(conf);
-// super.start();
-// }
-//
-// protected AMRMProtocol createSchedulerProxy() {
-// return new AMRMProtocolImpl(scheduler);
-// }
-//
-// @Override
-// protected void register() {}
-// @Override
-// protected void unregister() {}
-//
-// @Override
-// protected Resource getMinContainerCapability() {
-// Resource res = recordFactory.newRecordInstance(Resource.class);
-// res.setMemory(1024);
-// return res;
-// }
-//
-// @Override
-// protected Resource getMaxContainerCapability() {
-// Resource res = recordFactory.newRecordInstance(Resource.class);
-// res.setMemory(10240);
-// return res;
-// }
-//
-// public void sendRequest(ContainerRequestEvent req) {
-// sendRequests(Arrays.asList(new ContainerRequestEvent[]{req}));
-// }
-//
-// public void sendRequests(List<ContainerRequestEvent> reqs) {
-// for (ContainerRequestEvent req : reqs) {
-// handle(req);
-// }
-// }
-//
-// //API to be used by tests
-// public List<TaskAttemptContainerAssignedEvent> schedule() {
-// //run the scheduler
-// try {
-// heartbeat();
-// } catch (Exception e) {
-// LOG.error("error in heartbeat ", e);
-// throw new YarnException(e);
-// }
-//
-// List<TaskAttemptContainerAssignedEvent> result = new ArrayList(events);
-// events.clear();
-// return result;
-// }
-//
-// protected void startAllocatorThread() {
-// //override to NOT start thread
-// }
-//
-// static class TestContext implements AppContext {
-// private List<TaskAttemptContainerAssignedEvent> events;
-// TestContext(List<TaskAttemptContainerAssignedEvent> events) {
-// this.events = events;
-// }
-// @Override
-// public Map<JobId, Job> getAllJobs() {
-// return null;
-// }
-// @Override
-// public ApplicationAttemptId getApplicationAttemptId() {
-// return recordFactory.newRecordInstance(ApplicationAttemptId.class);
-// }
-// @Override
-// public ApplicationId getApplicationID() {
-// return recordFactory.newRecordInstance(ApplicationId.class);
-// }
-// @Override
-// public EventHandler getEventHandler() {
-// return new EventHandler() {
-// @Override
-// public void handle(Event event) {
-// events.add((TaskAttemptContainerAssignedEvent) event);
-// }
-// };
-// }
-// @Override
-// public Job getJob(JobId jobID) {
-// return null;
-// }
-//
-// @Override
-// public String getUser() {
-// return null;
-// }
-//
-// @Override
-// public Clock getClock() {
-// return null;
-// }
-//
-// @Override
-// public String getApplicationName() {
-// return null;
-// }
-//
-// @Override
-// public long getStartTime() {
-// return 0;
-// }
-// }
-// }
-//
-// public static void main(String[] args) throws Exception {
-// TestRMContainerAllocator t = new TestRMContainerAllocator();
-// t.testSimple();
-// //t.testResource();
-// t.testMapReduceScheduling();
-// }
+
+ static final Log LOG = LogFactory
+ .getLog(TestRMContainerAllocator.class);
+ static final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+
+ @After
+ public void tearDown() {
+ DefaultMetricsSystem.shutdown();
+ }
+
+ @Test
+ public void testSimple() throws Exception {
+
+ LOG.info("Running testSimple");
+
+ Configuration conf = new Configuration();
+ MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+ amNodeManager.nodeHeartbeat(true);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
+ 0, 0, 0, 0, 0, 0));
+ MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+ appAttemptId, mockJob);
+
+ // add resources to scheduler
+ MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
+ MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+ MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+ dispatcher.await();
+
+ // create the container request
+ ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+ new String[] { "h1" });
+ allocator.sendRequest(event1);
+
+ // send 1 more request with different resource req
+ ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
+ new String[] { "h2" });
+ allocator.sendRequest(event2);
+
+ // this tells the scheduler about the requests
+ // as nodes are not added, no allocations
+ List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ // send another request with different resource and priority
+ ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
+ new String[] { "h3" });
+ allocator.sendRequest(event3);
+
+ // this tells the scheduler about the requests
+ // as nodes are not added, no allocations
+ assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ // update resources in scheduler
+ nodeManager1.nodeHeartbeat(true); // Node heartbeat
+ nodeManager2.nodeHeartbeat(true); // Node heartbeat
+ nodeManager3.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+
+ assigned = allocator.schedule();
+ dispatcher.await();
+ checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
+ assigned, false);
+ }
+
+ @Test
+ public void testResource() throws Exception {
+
+ LOG.info("Running testResource");
+
+ Configuration conf = new Configuration();
+ MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+ amNodeManager.nodeHeartbeat(true);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
+ 0, 0, 0, 0, 0, 0));
+ MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+ appAttemptId, mockJob);
+
+ // add resources to scheduler
+ MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
+ MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+ MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+ dispatcher.await();
+
+ // create the container request
+ ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+ new String[] { "h1" });
+ allocator.sendRequest(event1);
+
+ // send 1 more request with different resource req
+ ContainerRequestEvent event2 = createReq(jobId, 2, 2048,
+ new String[] { "h2" });
+ allocator.sendRequest(event2);
+
+ // this tells the scheduler about the requests
+ // as nodes are not added, no allocations
+ List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ // update resources in scheduler
+ nodeManager1.nodeHeartbeat(true); // Node heartbeat
+ nodeManager2.nodeHeartbeat(true); // Node heartbeat
+ nodeManager3.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+
+ assigned = allocator.schedule();
+ dispatcher.await();
+ checkAssignments(new ContainerRequestEvent[] { event1, event2 },
+ assigned, false);
+ }
+
+ @Test
+ public void testMapReduceScheduling() throws Exception {
+
+ LOG.info("Running testMapReduceScheduling");
+
+ Configuration conf = new Configuration();
+ MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+ amNodeManager.nodeHeartbeat(true);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
+ 0, 0, 0, 0, 0, 0));
+ MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+ appAttemptId, mockJob);
+
+ // add resources to scheduler
+ MockNM nodeManager1 = rm.registerNode("h1:1234", 1024);
+ MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+ MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+ dispatcher.await();
+
+ // create the container request
+ // send MAP request
+ ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] {
+ "h1", "h2" }, true, false);
+ allocator.sendRequest(event1);
+
+ // send REDUCE request
+ ContainerRequestEvent event2 = createReq(jobId, 2, 3000,
+ new String[] { "h1" }, false, true);
+ allocator.sendRequest(event2);
+
+ // send MAP request
+ ContainerRequestEvent event3 = createReq(jobId, 3, 2048,
+ new String[] { "h3" }, false, false);
+ allocator.sendRequest(event3);
+
+ // this tells the scheduler about the requests
+ // as nodes are not added, no allocations
+ List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ // update resources in scheduler
+ nodeManager1.nodeHeartbeat(true); // Node heartbeat
+ nodeManager2.nodeHeartbeat(true); // Node heartbeat
+ nodeManager3.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+
+ assigned = allocator.schedule();
+ dispatcher.await();
+ checkAssignments(new ContainerRequestEvent[] { event1, event3 },
+ assigned, false);
+
+ // validate that no container is assigned to h1 as it doesn't have 2048
+ for (TaskAttemptContainerAssignedEvent assig : assigned) {
+ Assert.assertFalse("Assigned count not correct", "h1".equals(assig
+ .getContainer().getNodeId().getHost()));
+ }
+ }
+
+ private static class MyResourceManager extends MockRM {
+
+ public MyResourceManager(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ protected Dispatcher createDispatcher() {
+ return new DrainDispatcher();
+ }
+
+ @Override
+ protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
+ // Dispatch inline for test sanity
+ return new EventHandler<SchedulerEvent>() {
+ @Override
+ public void handle(SchedulerEvent event) {
+ scheduler.handle(event);
+ }
+ };
+ }
+ @Override
+ protected ResourceScheduler createScheduler() {
+ return new MyFifoScheduler(getRMContext());
+ }
+ }
+
+ private static class FakeJob extends JobImpl {
+
+ public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf,
+ int numMaps, int numReduces) {
+ super(appAttemptID, conf, null, null, null, null, null, null, null,
+ null);
+ this.jobId = MRBuilderUtils
+ .newJobId(appAttemptID.getApplicationId(), 0);
+ this.numMaps = numMaps;
+ this.numReduces = numReduces;
+ }
+
+ private float setupProgress;
+ private float mapProgress;
+ private float reduceProgress;
+ private float cleanupProgress;
+ private final int numMaps;
+ private final int numReduces;
+ private JobId jobId;
+
+ void setProgress(float setupProgress, float mapProgress,
+ float reduceProgress, float cleanupProgress) {
+ this.setupProgress = setupProgress;
+ this.mapProgress = mapProgress;
+ this.reduceProgress = reduceProgress;
+ this.cleanupProgress = cleanupProgress;
+ }
+
+ @Override
+ public int getTotalMaps() { return this.numMaps; }
+ @Override
+ public int getTotalReduces() { return this.numReduces;}
+
+ @Override
+ public JobReport getReport() {
+ return MRBuilderUtils.newJobReport(this.jobId, "job", "user",
+ JobState.RUNNING, 0, 0, this.setupProgress, this.mapProgress,
+ this.reduceProgress, this.cleanupProgress);
+ }
+ }
+
+ @Test
+ public void testReportedAppProgress() throws Exception {
+
+ LOG.info("Running testReportedAppProgress");
+
+ Configuration conf = new Configuration();
+ MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+ amNodeManager.nodeHeartbeat(true);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ FakeJob job = new FakeJob(appAttemptId, conf, 2, 2);
+ MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+ appAttemptId, job);
+
+ allocator.schedule(); // Send heartbeat
+ dispatcher.await();
+ Assert.assertEquals(0.0, app.getProgress(), 0.0);
+
+ job.setProgress(100, 10, 0, 0);
+ allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals(9.5f, app.getProgress(), 0.0);
+
+ job.setProgress(100, 80, 0, 0);
+ allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals(41.0f, app.getProgress(), 0.0);
+
+ job.setProgress(100, 100, 20, 0);
+ allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals(59.0f, app.getProgress(), 0.0);
+
+ job.setProgress(100, 100, 100, 100);
+ allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals(100.0f, app.getProgress(), 0.0);
+ }
+
+ @Test
+ public void testReportedAppProgressWithOnlyMaps() throws Exception {
+
+ LOG.info("Running testReportedAppProgressWithOnlyMaps");
+
+ Configuration conf = new Configuration();
+ MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+ amNodeManager.nodeHeartbeat(true);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ FakeJob job = new FakeJob(appAttemptId, conf, 2, 0);
+ MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+ appAttemptId, job);
+
+ allocator.schedule(); // Send heartbeat
+ dispatcher.await();
+ Assert.assertEquals(0.0, app.getProgress(), 0.0);
+
+ job.setProgress(100, 10, 0, 0);
+ allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals(14f, app.getProgress(), 0.0);
+
+ job.setProgress(100, 60, 0, 0);
+ allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals(59.0f, app.getProgress(), 0.0);
+
+ job.setProgress(100, 100, 0, 100);
+ allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals(100.0f, app.getProgress(), 0.0);
+ }
+
+ private static class MyFifoScheduler extends FifoScheduler {
+
+ public MyFifoScheduler(RMContext rmContext) {
+ super();
+ try {
+ reinitialize(new Configuration(), new ContainerTokenSecretManager(),
+ rmContext);
+ } catch (IOException ie) {
+ LOG.info("add application failed with ", ie);
+ assert (false);
+ }
+ }
+
+ // override this to copy the objects otherwise FifoScheduler updates the
+ // numContainers in same objects as kept by RMContainerAllocator
+ @Override
+ public synchronized Allocation allocate(
+ ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
+ List<ContainerId> release) {
+ List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
+ for (ResourceRequest req : ask) {
+ ResourceRequest reqCopy = BuilderUtils.newResourceRequest(req
+ .getPriority(), req.getHostName(), req.getCapability(), req
+ .getNumContainers());
+ askCopy.add(reqCopy);
+ }
+ return super.allocate(applicationAttemptId, askCopy, release);
+ }
+ }
+
+ private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
+ int memory, String[] hosts) {
+ return createReq(jobId, taskAttemptId, memory, hosts, false, false);
+ }
+
+ private ContainerRequestEvent
+ createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts,
+ boolean earlierFailedAttempt, boolean reduce) {
+ TaskId taskId;
+ if (reduce) {
+ taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+ } else {
+ taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+ }
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
+ taskAttemptId);
+ Resource containerNeed = BuilderUtils.newResource(memory);
+ if (earlierFailedAttempt) {
+ return ContainerRequestEvent
+ .createContainerRequestEventForFailedContainer(attemptId,
+ containerNeed);
+ }
+ return new ContainerRequestEvent(attemptId, containerNeed, hosts,
+ new String[] { NetworkTopology.DEFAULT_RACK });
+ }
+
+ private void checkAssignments(ContainerRequestEvent[] requests,
+ List<TaskAttemptContainerAssignedEvent> assignments,
+ boolean checkHostMatch) {
+ Assert.assertNotNull("Container not assigned", assignments);
+ Assert.assertEquals("Assigned count not correct", requests.length,
+ assignments.size());
+
+ // check for uniqueness of containerIDs
+ Set<ContainerId> containerIds = new HashSet<ContainerId>();
+ for (TaskAttemptContainerAssignedEvent assigned : assignments) {
+ containerIds.add(assigned.getContainer().getId());
+ }
+ Assert.assertEquals("Assigned containers must be different", assignments
+ .size(), containerIds.size());
+
+ // check for all assignment
+ for (ContainerRequestEvent req : requests) {
+ TaskAttemptContainerAssignedEvent assigned = null;
+ for (TaskAttemptContainerAssignedEvent ass : assignments) {
+ if (ass.getTaskAttemptID().equals(req.getAttemptID())) {
+ assigned = ass;
+ break;
+ }
+ }
+ checkAssignment(req, assigned, checkHostMatch);
+ }
+ }
+
+ private void checkAssignment(ContainerRequestEvent request,
+ TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) {
+ Assert.assertNotNull("Nothing assigned to attempt "
+ + request.getAttemptID(), assigned);
+ Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(),
+ assigned.getTaskAttemptID());
+ if (checkHostMatch) {
+ Assert.assertTrue("Not assigned to requested host", Arrays.asList(
+ request.getHosts()).contains(
+ assigned.getContainer().getNodeId().toString()));
+ }
+ }
+
+ // Mock RMContainerAllocator
+ // Instead of talking to remote Scheduler,uses the local Scheduler
+ private static class MyContainerAllocator extends RMContainerAllocator {
+ static final List<TaskAttemptContainerAssignedEvent> events
+ = new ArrayList<TaskAttemptContainerAssignedEvent>();
+
+ private MyResourceManager rm;
+
+ @SuppressWarnings("rawtypes")
+ private static AppContext createAppContext(
+ ApplicationAttemptId appAttemptId, Job job) {
+ AppContext context = mock(AppContext.class);
+ ApplicationId appId = appAttemptId.getApplicationId();
+ when(context.getApplicationID()).thenReturn(appId);
+ when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
+ when(context.getJob(isA(JobId.class))).thenReturn(job);
+ when(context.getEventHandler()).thenReturn(new EventHandler() {
+ @Override
+ public void handle(Event event) {
+ // Only capture interesting events.
+ if (event instanceof TaskAttemptContainerAssignedEvent) {
+ events.add((TaskAttemptContainerAssignedEvent) event);
+ }
+ }
+ });
+ return context;
+ }
+
+ private static ClientService createMockClientService() {
+ ClientService service = mock(ClientService.class);
+ when(service.getBindAddress()).thenReturn(
+ NetUtils.createSocketAddr("localhost:4567"));
+ when(service.getHttpPort()).thenReturn(890);
+ return service;
+ }
+
+ MyContainerAllocator(MyResourceManager rm, Configuration conf,
+ ApplicationAttemptId appAttemptId, Job job) {
+ super(createMockClientService(), createAppContext(appAttemptId, job));
+ this.rm = rm;
+ super.init(conf);
+ super.start();
+ }
+
+ @Override
+ protected AMRMProtocol createSchedulerProxy() {
+ return this.rm.getApplicationMasterService();
+ }
+
+ @Override
+ protected void register() {
+ super.register();
+ }
+
+ @Override
+ protected void unregister() {
+ }
+
+ @Override
+ protected Resource getMinContainerCapability() {
+ return BuilderUtils.newResource(1024);
+ }
+
+ @Override
+ protected Resource getMaxContainerCapability() {
+ return BuilderUtils.newResource(10240);
+ }
+
+ public void sendRequest(ContainerRequestEvent req) {
+ sendRequests(Arrays.asList(new ContainerRequestEvent[] { req }));
+ }
+
+ public void sendRequests(List<ContainerRequestEvent> reqs) {
+ for (ContainerRequestEvent req : reqs) {
+ super.handle(req);
+ }
+ }
+
+ // API to be used by tests
+ public List<TaskAttemptContainerAssignedEvent> schedule() {
+ // run the scheduler
+ try {
+ super.heartbeat();
+ } catch (Exception e) {
+ LOG.error("error in heartbeat ", e);
+ throw new YarnException(e);
+ }
+
+ List<TaskAttemptContainerAssignedEvent> result
+ = new ArrayList<TaskAttemptContainerAssignedEvent>(events);
+ events.clear();
+ return result;
+ }
+
+ protected void startAllocatorThread() {
+ // override to NOT start thread
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TestRMContainerAllocator t = new TestRMContainerAllocator();
+ t.testSimple();
+ t.testResource();
+ t.testMapReduceScheduling();
+ t.testReportedAppProgress();
+ t.testReportedAppProgressWithOnlyMaps();
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1177117&r1=1177116&r2=1177117&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Thu Sep 29 00:09:56 2011
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -280,15 +281,27 @@ public class TypeConverter {
}
public static org.apache.hadoop.mapred.JobStatus fromYarn(
- JobReport jobreport, String jobFile, String trackingUrl) {
+ JobReport jobreport, String jobFile) {
JobPriority jobPriority = JobPriority.NORMAL;
- return new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.getJobId()),
- jobreport.getSetupProgress(), jobreport.getMapProgress(),
- jobreport.getReduceProgress(), jobreport.getCleanupProgress(),
- fromYarn(jobreport.getJobState()),
- jobPriority, jobreport.getUser(), jobreport.getJobName(),
- jobFile, trackingUrl);
+ org.apache.hadoop.mapred.JobStatus jobStatus =
+ new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.getJobId()),
+ jobreport.getSetupProgress(), jobreport.getMapProgress(),
+ jobreport.getReduceProgress(), jobreport.getCleanupProgress(),
+ fromYarn(jobreport.getJobState()),
+ jobPriority, jobreport.getUser(), jobreport.getJobName(),
+ jobFile, jobreport.getTrackingUrl());
+ jobStatus.setFailureInfo(jobreport.getDiagnostics());
+ return jobStatus;
+ }
+
+ public static org.apache.hadoop.mapreduce.QueueState fromYarn(
+ QueueState state) {
+ org.apache.hadoop.mapreduce.QueueState qState =
+ org.apache.hadoop.mapreduce.QueueState.getState(
+ state.toString().toLowerCase());
+ return qState;
}
+
public static int fromYarn(JobState state) {
switch (state) {
@@ -412,6 +425,7 @@ public class TypeConverter {
);
jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url
jobStatus.setStartTime(application.getStartTime());
+ jobStatus.setFailureInfo(application.getDiagnostics());
return jobStatus;
}
@@ -431,9 +445,9 @@ public class TypeConverter {
public static QueueInfo fromYarn(org.apache.hadoop.yarn.api.records.QueueInfo
queueInfo, Configuration conf) {
- return new QueueInfo(queueInfo.getQueueName(),
- queueInfo.toString(), QueueState.RUNNING,
- TypeConverter.fromYarnApps(queueInfo.getApplications(), conf));
+ return new QueueInfo(queueInfo.getQueueName(),queueInfo.toString(),
+ fromYarn(queueInfo.getQueueState()), TypeConverter.fromYarnApps(
+ queueInfo.getApplications(), conf));
}
public static QueueInfo[] fromYarnQueueInfo(
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java?rev=1177117&r1=1177116&r2=1177117&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java Thu Sep 29 00:09:56 2011
@@ -29,6 +29,8 @@ public interface JobReport {
public abstract long getFinishTime();
public abstract String getUser();
public abstract String getJobName();
+ public abstract String getTrackingUrl();
+ public abstract String getDiagnostics();
public abstract void setJobId(JobId jobId);
public abstract void setJobState(JobState jobState);
@@ -40,4 +42,6 @@ public interface JobReport {
public abstract void setFinishTime(long finishTime);
public abstract void setUser(String user);
public abstract void setJobName(String jobName);
+ public abstract void setTrackingUrl(String trackingUrl);
+ public abstract void setDiagnostics(String diagnostics);
}