You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC
svn commit: r1457129 [31/38] - in /incubator/tez: ./ tez-ampool/
tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/
tez-ampool/src/main/conf/ tez-ampool/src/main/java/
tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,895 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttemptStateInternal;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventFailRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStartedRemotely;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStatusUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorContainerDeAllocateRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventAssignTA;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventCompleted;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunched;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunchRequest;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventContainerAllocated;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeState;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleaner;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+
+/**
+ * Mock MRAppMaster. Doesn't start RPC servers.
+ * No threads are started except of the event Dispatcher thread.
+ */
+@SuppressWarnings("unchecked")
+public class MRApp extends MRAppMaster {
+ private static final Log LOG = LogFactory.getLog(MRApp.class);
+
+ int maps;
+ int reduces;
+
+ private File testWorkDir;
+ private Path testAbsPath;
+ private ClusterInfo clusterInfo;
+ private volatile boolean exited = false;
+
+ // TODO Default values. These will no longer be used if a test decides to mark a
+ // node as bad, in which case it should use the getters.
+ // Leaving these as public to prevent changes in multiple places.
+ public static String NM_HOST = "localhost";
+ public static int NM_PORT = 1234;
+ public static int NM_HTTP_PORT = 8042;
+
+ private int currentNmPort;
+
+ private static final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ //if true, tasks complete automatically as soon as they are launched
+ protected boolean autoComplete = false;
+
+ static ApplicationId applicationId;
+
+ // TODO: Look at getting rid of this. Each test should generate it's own id,
+ // or have it provided.. Using a custom id without updating this causes problems.
+ static {
+ applicationId = recordFactory.newRecordInstance(ApplicationId.class);
+ applicationId.setClusterTimestamp(0);
+ applicationId.setId(0);
+ }
+
+ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+ boolean cleanOnStart, Clock clock) {
+ this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock);
+ }
+
+ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+ boolean cleanOnStart) {
+ this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
+ }
+
+ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+ boolean cleanOnStart, int startCount) {
+ this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
+ new SystemClock());
+ }
+
+ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+ boolean cleanOnStart, int startCount, Clock clock) {
+ this(getApplicationAttemptId(applicationId, startCount), getContainerId(
+ applicationId, startCount), maps, reduces, autoComplete, testName,
+ cleanOnStart, startCount, clock);
+ }
+
+ public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
+ int maps, int reduces, boolean autoComplete, String testName,
+ boolean cleanOnStart, int startCount) {
+ this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
+ cleanOnStart, startCount, new SystemClock());
+ }
+
+ public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
+ int maps, int reduces, boolean autoComplete, String testName,
+ boolean cleanOnStart, int startCount, Clock clock) {
+ super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
+ .currentTimeMillis());
+ this.currentNmPort = NM_PORT;
+ this.testWorkDir = new File("target", testName);
+ testAbsPath = new Path(testWorkDir.getAbsolutePath());
+ LOG.info("PathUsed: " + testAbsPath);
+ if (cleanOnStart) {
+ testAbsPath = new Path(testWorkDir.getAbsolutePath());
+ try {
+ FileContext.getLocalFSFileContext().delete(testAbsPath, true);
+ } catch (Exception e) {
+ LOG.warn("COULD NOT CLEANUP: " + testAbsPath, e);
+ throw new YarnException("could not cleanup test dir", e);
+ }
+ }
+
+ applicationId = appAttemptId.getApplicationId();
+ this.maps = maps;
+ this.reduces = reduces;
+ this.autoComplete = autoComplete;
+ }
+
+
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ if (this.clusterInfo != null) {
+ getContext().getClusterInfo().setMinContainerCapability(
+ this.clusterInfo.getMinContainerCapability());
+ getContext().getClusterInfo().setMaxContainerCapability(
+ this.clusterInfo.getMaxContainerCapability());
+ } else {
+ getContext().getClusterInfo().setMinContainerCapability(
+ BuilderUtils.newResource(1024, 1));
+ getContext().getClusterInfo().setMaxContainerCapability(
+ BuilderUtils.newResource(10240, 1));
+ }
+ // TODO Any point doing this here. Otherwise move to an overridden createDispatcher()
+// conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false);
+ }
+
+ public Job submit(Configuration conf) throws Exception {
+ String user = conf.get(MRJobConfig.USER_NAME, UserGroupInformation
+ .getCurrentUser().getShortUserName());
+ conf.set(MRJobConfig.USER_NAME, user);
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, testAbsPath.toString());
+ conf.setBoolean(MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR, true);
+ //TODO: fix the bug where the speculator gets events with
+ //not-fully-constructed objects. For now, disable speculative exec
+ LOG.info("****DISABLING SPECULATIVE EXECUTION*****");
+ conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+ conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+
+ init(conf);
+ start();
+ DefaultMetricsSystem.shutdown();
+ Job job = getContext().getAllJobs().values().iterator().next();
+
+ // Write job.xml
+ String jobFile = MRApps.getJobFile(conf, user,
+ TypeConverter.fromYarn(job.getID()));
+ LOG.info("Writing job conf to " + jobFile);
+ new File(jobFile).getParentFile().mkdirs();
+ conf.writeXml(new FileOutputStream(jobFile));
+
+ return job;
+ }
+
+ public void waitForInternalState(TaskAttemptImpl attempt,
+ TaskAttemptStateInternal finalState) throws Exception {
+ int timeoutSecs = 0;
+ TaskAttemptReport report = attempt.getReport();
+ TaskAttemptStateInternal iState = attempt.getInternalState();
+ while (!finalState.equals(iState) && timeoutSecs++ < 20) {
+ System.out.println("TaskAttempt Internal State is : " + iState
+ + " Waiting for Internal state : " + finalState + " progress : "
+ + report.getProgress());
+ Thread.sleep(500);
+ report = attempt.getReport();
+ iState = attempt.getInternalState();
+ }
+ System.out.println("TaskAttempt Internal State is : " + iState);
+ Assert.assertEquals("TaskAttempt Internal state is not correct (timedout)",
+ finalState, iState);
+ }
+
+ public void waitForState(TaskAttempt attempt,
+ TaskAttemptState finalState) throws Exception {
+ int timeoutSecs = 0;
+ TaskAttemptReport report = attempt.getReport();
+ while (!finalState.equals(report.getTaskAttemptState()) &&
+ timeoutSecs++ < 20) {
+ System.out.println("TaskAttempt State for " + attempt.getID() + " is : " +
+ report.getTaskAttemptState() +
+ " Waiting for state : " + finalState +
+ " progress : " + report.getProgress());
+ report = attempt.getReport();
+ Thread.sleep(500);
+ }
+ System.out.println("TaskAttempt State is : " + report.getTaskAttemptState());
+ Assert.assertEquals("TaskAttempt state is not correct (timedout)",
+ finalState,
+ report.getTaskAttemptState());
+ }
+
+ public void waitForState(Task task, TaskState finalState) throws Exception {
+ int timeoutSecs = 0;
+ TaskReport report = task.getReport();
+ while (!finalState.equals(report.getTaskState()) &&
+ timeoutSecs++ < 20) {
+ System.out.println("Task State for " + task.getID() + " is : "
+ + report.getTaskState() + " Waiting for state : " + finalState
+ + " progress : " + report.getProgress());
+ report = task.getReport();
+ Thread.sleep(500);
+ }
+ System.out.println("Task State is : " + report.getTaskState());
+ Assert.assertEquals("Task state is not correct (timedout)", finalState,
+ report.getTaskState());
+ }
+
+ public void waitForAMExit() throws Exception {
+ int timeoutSecs = 0;
+ while (!exited && timeoutSecs ++ < 20) {
+ System.out.println("Waiting for AM exit");
+ Thread.sleep(500);
+ }
+ System.out.print("AM Exit State is: " + exited);
+ Assert.assertEquals("AM did not exit (timedout)", true, exited);
+ }
+
+ public void waitForState(Job job, JobState finalState) throws Exception {
+ int timeoutSecs = 0;
+ JobReport report = job.getReport();
+ while (!finalState.equals(report.getJobState()) &&
+ timeoutSecs++ < 20) {
+ System.out.println("Job State is : " + report.getJobState() +
+ " Waiting for state : " + finalState +
+ " map progress : " + report.getMapProgress() +
+ " reduce progress : " + report.getReduceProgress());
+ report = job.getReport();
+ Thread.sleep(500);
+ }
+ System.out.println("Job State is : " + report.getJobState());
+ Assert.assertEquals("Job state is not correct (timedout)", finalState,
+ job.getState());
+ }
+
+ // Gets the current nodeId being used in simulations. A node will be forgotten
+ // if it turns unhealthy.
+ public String getCurrentNmHost() {
+ return NM_HOST;
+ }
+
+ public int getCurrentNmPort() {
+ return this.currentNmPort;
+ }
+
+ public int getCurrentNmHttpPort() {
+ return NM_HTTP_PORT;
+ }
+
+ public void waitForState(Service.STATE finalState) throws Exception {
+ int timeoutSecs = 0;
+ while (!finalState.equals(getServiceState()) && timeoutSecs++ < 20) {
+ System.out.println("MRApp State is : " + getServiceState()
+ + " Waiting for state : " + finalState);
+ Thread.sleep(500);
+ }
+ System.out.println("MRApp State is : " + getServiceState());
+ Assert.assertEquals("MRApp state is not correct (timedout)", finalState,
+ getServiceState());
+ }
+
+ public void verifyCompleted() {
+ for (Job job : getContext().getAllJobs().values()) {
+ JobReport jobReport = job.getReport();
+ System.out.println("Job start time :" + jobReport.getStartTime());
+ System.out.println("Job finish time :" + jobReport.getFinishTime());
+ Assert.assertTrue("Job start time is not less than finish time",
+ jobReport.getStartTime() <= jobReport.getFinishTime());
+ Assert.assertTrue("Job finish time is in future",
+ jobReport.getFinishTime() <= System.currentTimeMillis());
+ for (Task task : job.getTasks().values()) {
+ TaskReport taskReport = task.getReport();
+ System.out.println("Task start time : " + taskReport.getStartTime());
+ System.out.println("Task finish time : " + taskReport.getFinishTime());
+ Assert.assertTrue("Task start time is not less than finish time",
+ taskReport.getStartTime() <= taskReport.getFinishTime());
+ for (TaskAttempt attempt : task.getAttempts().values()) {
+ TaskAttemptReport attemptReport = attempt.getReport();
+ Assert.assertTrue("Attempt start time is not less than finish time",
+ attemptReport.getStartTime() <= attemptReport.getFinishTime());
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void downloadTokensAndSetupUGI(Configuration conf) {
+ }
+
+ private static ApplicationAttemptId getApplicationAttemptId(
+ ApplicationId applicationId, int startCount) {
+ ApplicationAttemptId applicationAttemptId =
+ recordFactory.newRecordInstance(ApplicationAttemptId.class);
+ applicationAttemptId.setApplicationId(applicationId);
+ applicationAttemptId.setAttemptId(startCount);
+ return applicationAttemptId;
+ }
+
+ private static ContainerId getContainerId(ApplicationId applicationId,
+ int startCount) {
+ ApplicationAttemptId appAttemptId =
+ getApplicationAttemptId(applicationId, startCount);
+ ContainerId containerId =
+ BuilderUtils.newContainerId(appAttemptId, startCount);
+ return containerId;
+ }
+
+ @Override
+ protected Job createJob(Configuration conf) {
+ UserGroupInformation currentUser = null;
+ try {
+ currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
+ getDispatcher().getEventHandler(),
+ getTaskAttemptListener(), getContext().getClock(), getCommitter(),
+ isNewApiCommitter(), currentUser.getUserName(),
+ getTaskHeartbeatHandler(), getContext());
+ ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
+
+ getDispatcher().register(JobFinishEvent.Type.class, new MRAppJobFinishHandler());
+
+ return newJob;
+ }
+
+ protected class MRAppJobFinishHandler extends JobFinishEventHandlerCR {
+
+ @Override
+ protected void exit() {
+ exited = true;
+ }
+
+ @Override
+ protected void maybeSendJobEndNotification() {
+ }
+ }
+
+ @Override
+ protected TaskAttemptListener createTaskAttemptListener(AppContext context,
+ TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
+ return new TaskAttemptListener(){
+
+ @Override
+ public InetSocketAddress getAddress() {
+ return NetUtils.createSocketAddr("localhost:54321");
+ }
+
+ @Override
+ public void unregisterTaskAttempt(TaskAttemptId attemptID) {
+ // TODO Auto-generated method stub
+
+ }
+
+
+
+ @Override
+ public void registerRunningContainer(ContainerId containerId) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void registerTaskAttempt(TaskAttemptId attemptId,
+ ContainerId containerId) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void unregisterRunningContainer(ContainerId containerId) {
+ // TODO Auto-generated method stub
+
+ }
+ };
+ }
+
+ @Override
+ protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
+ Configuration conf) {
+ return new TaskHeartbeatHandler(context, maps) {
+
+ @Override
+ public void init(Configuration conf) {
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void stop() {
+ }
+ };
+ }
+
+ @Override
+ protected ContainerHeartbeatHandler createContainerHeartbeatHandler(
+ AppContext context, Configuration conf) {
+ return new ContainerHeartbeatHandler(context, 1) {
+ @Override
+ public void init(Configuration conf) {
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void stop() {
+ }
+ };
+ }
+
+ @Override
+ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+ AppContext context) {//disable history
+ return new EventHandler<JobHistoryEvent>() {
+ @Override
+ public void handle(JobHistoryEvent event) {
+ }
+ };
+ }
+
+
+
+ @Override
+ protected ContainerLauncher createContainerLauncher(AppContext context) {
+ return new MockContainerLauncher();
+ }
+
+ // appAcls and attemptToContainerIdMap shared between various mocks.
+ protected Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
+ protected Map<TaskAttemptId, ContainerId> attemptToContainerIdMap = new HashMap<TaskAttemptId, ContainerId>();
+
+ protected class MockContainerLauncher implements ContainerLauncher {
+
+ //We are running locally so set the shuffle port to -1
+ int shufflePort = -1;
+
+ public MockContainerLauncher() {
+ }
+
+ @Override
+ public void handle(NMCommunicatorEvent event) {
+ switch (event.getType()) {
+ case CONTAINER_LAUNCH_REQUEST:
+ LOG.info("DEBUG: Handling CONTAINER_LAUNCH_REQUEST for: " + event.getContainerId());
+
+ AMContainer amContainer = getContext().getAllContainers().get(event.getContainerId());
+ TaskAttemptId attemptIdForContainer = amContainer.getQueuedTaskAttempts().iterator().next();
+ // Container Launched.
+ getContext().getEventHandler().handle(
+ new AMContainerEventLaunched(event.getContainerId(), shufflePort));
+
+ // Simulate a TaskPull from the remote task.
+ getContext().getEventHandler().handle(
+ new AMContainerEvent(event.getContainerId(),
+ AMContainerEventType.C_PULL_TA));
+
+ // Simulate a TaskAttemptStartedEvent to the TaskAtetmpt.
+ // Maybe simulate a completed task.
+ getContext().getEventHandler().handle(
+ new TaskAttemptEventStartedRemotely(attemptIdForContainer, event.getContainerId(), appAcls,
+ shufflePort));
+ attemptLaunched(attemptIdForContainer);
+
+ break;
+ case CONTAINER_STOP_REQUEST:
+ ContainerStatus cs = Records.newRecord(ContainerStatus.class);
+ cs.setContainerId(event.getContainerId());
+ getContext().getEventHandler().handle(new AMContainerEventCompleted(cs));
+ break;
+ }
+ }
+ }
+
+ protected void attemptLaunched(TaskAttemptId attemptId) {
+ if (autoComplete) {
+ // send the done event
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE));
+ }
+ }
+
+ @Override
+ protected ContainerRequestor createContainerRequestor(
+ ClientService clientService, AppContext appContext) {
+ return new MRAppContainerRequestor(clientService, appContext);
+ }
+
+ protected class MRAppContainerRequestor extends RMContainerRequestor {
+
+ int numReleaseRequests;
+
+ public MRAppContainerRequestor(ClientService clientService,
+ AppContext context) {
+ super(clientService, context);
+ }
+
+ @Override public void init(Configuration conf) {}
+ @Override public void start() {}
+ @Override public void stop() {}
+ //TODO TODO: getApplicationAcls, getJob
+
+ @Override public void addContainerReq(ContainerRequest req) {}
+ @Override public void decContainerReq(ContainerRequest req) {}
+
+ public void handle(RMCommunicatorEvent rawEvent) {
+ LOG.info("DEBUG: MRAppContainerRequestor handling event of type:" + rawEvent.getType() + ", event: " + rawEvent + ", for containerId: ");
+ switch (rawEvent.getType()) {
+ case CONTAINER_DEALLOCATE:
+ numReleaseRequests++;
+ ContainerStatus cs = Records.newRecord(ContainerStatus.class);
+ cs.setContainerId(((RMCommunicatorContainerDeAllocateRequestEvent)rawEvent).getContainerId());
+ getContext().getEventHandler().handle(new AMContainerEventCompleted(cs));
+ LOG.info("DEBUG: Sending out C_COMPLETE for containerId: " + cs.getContainerId());
+ break;
+ default:
+ LOG.warn("Invalid event of type: " + rawEvent.getType() + ", Event: "
+ + rawEvent);
+ break;
+ }
+ }
+
+ public int getNumReleaseRequests() {
+ return numReleaseRequests;
+ }
+ }
+
+ @Override
+ protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
+ AppContext appContext) {
+ return new MRAppAMScheduler();
+ }
+
+ protected class MRAppAMScheduler extends AbstractService implements
+ ContainerAllocator {
+ private int containerCount;
+
+ MRAppAMScheduler() {
+ super(MRAppAMScheduler.class.getSimpleName());
+ }
+
+ public void start() {}
+ public void init(Configuration conf) {}
+ public void stop() {}
+
+ private NodeId getUsableNode() {
+ NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, currentNmPort);
+ getContext().getAllNodes().nodeSeen(nodeId);
+ if (getContext().getAllNodes().get(nodeId).getState() != AMNodeState.ACTIVE) {
+ LOG.info("Current node is not ACTIVE. Creating a new one");
+ currentNmPort++;
+ nodeId = BuilderUtils.newNodeId(NM_HOST, currentNmPort);
+ getContext().getAllNodes().nodeSeen(nodeId);
+ if (getContext().getAllNodes().get(nodeId).getState() != AMNodeState.ACTIVE) {
+ throw new YarnException("Failed to find a usable nodeId");
+ }
+ LOG.info("Created new nodeId: " + nodeId);
+ }
+ return nodeId;
+ }
+
+ @Override
+ public void handle(AMSchedulerEvent rawEvent) {
+ LOG.info("DEBUG: MRAppAMScheduler handling event of type:" + rawEvent.getType() + ", event: " + rawEvent);
+ switch (rawEvent.getType()) {
+ case S_TA_LAUNCH_REQUEST:
+ AMSchedulerTALaunchRequestEvent lEvent = (AMSchedulerTALaunchRequestEvent)rawEvent;
+
+ // Wait for the node to be healthy before simulating a container allocation.
+ NodeId nodeId = getUsableNode();
+
+ ContainerId cId = Records.newRecord(ContainerId.class);
+ cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
+ cId.setId(containerCount++);
+
+ Container container = BuilderUtils.newContainer(cId, nodeId,
+ NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
+ getContext().getAllContainers().addContainerIfNew(container);
+
+ getContext().getEventHandler().handle(
+ new AMNodeEventContainerAllocated(container
+ .getNodeId(), container.getId()));
+
+ JobID id = TypeConverter.fromYarn(applicationId);
+ JobId jobId = TypeConverter.toYarn(id);
+ getContext().getEventHandler().handle(new JobHistoryEvent(jobId,
+ new NormalizedResourceEvent(TaskType.REDUCE, 100)));
+ getContext().getEventHandler().handle(new JobHistoryEvent(jobId,
+ new NormalizedResourceEvent(TaskType.MAP, 100)));
+
+ attemptToContainerIdMap.put(lEvent.getAttemptID(), cId);
+ if (getContext().getAllContainers().get(cId).getState() == AMContainerState.ALLOCATED) {
+ LOG.info("DEBUG: Sending launch request for container: " + cId
+ + " for taskAttemptId: " + lEvent.getAttemptID());
+ AMContainerEventLaunchRequest lrEvent = new AMContainerEventLaunchRequest(
+ cId, jobId, lEvent.getAttemptID().getTaskId().getTaskType(),
+ lEvent.getJobToken(), lEvent.getCredentials(), false,
+ new JobConf(getContext().getJob(jobId).getConf()));
+ getContext().getEventHandler().handle(lrEvent);
+ }
+ LOG.info("DEBUG: Assigning attempt [" + lEvent.getAttemptID()
+ + "] to Container [" + cId + "]");
+ getContext().getEventHandler().handle(
+ new AMContainerEventAssignTA(cId, lEvent.getAttemptID(), lEvent
+ .getRemoteTaskContext()));
+
+ break;
+ case S_TA_ENDED:
+ // Send out a Container_stop_request.
+ AMSchedulerEventTAEnded sEvent = (AMSchedulerEventTAEnded) rawEvent;
+ LOG.info("DEBUG: Handling S_TA_ENDED for attemptId:"
+ + sEvent.getAttemptID() + " with state: " + sEvent.getState());
+ switch (sEvent.getState()) {
+ case FAILED:
+ case KILLED:
+ getContext().getEventHandler().handle(
+ new AMContainerEvent(attemptToContainerIdMap.get(sEvent
+ .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+ break;
+ case SUCCEEDED:
+ // No re-use in MRApp. Stop the container.
+ getContext().getEventHandler().handle(
+ new AMContainerEvent(attemptToContainerIdMap.get(sEvent
+ .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+ break;
+ default:
+ throw new YarnException("Unexpected state: " + sEvent.getState());
+ }
+ case S_CONTAINERS_ALLOCATED:
+ break;
+ case S_CONTAINER_COMPLETED:
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ @Override
+ protected TaskCleaner createTaskCleaner(AppContext context) {
+ return new TaskCleaner() {
+ @Override
+ public void handle(TaskCleanupEvent event) {
+ }
+ };
+ }
+
+ @Override
+ protected ClientService createClientService(AppContext context) {
+ return new ClientService(){
+ @Override
+ public InetSocketAddress getBindAddress() {
+ return NetUtils.createSocketAddr("localhost:9876");
+ }
+
+ @Override
+ public int getHttpPort() {
+ return -1;
+ }
+ };
+ }
+
+ public void setClusterInfo(ClusterInfo clusterInfo) {
+ // Only useful if set before a job is started.
+ if (getServiceState() == Service.STATE.NOTINITED
+ || getServiceState() == Service.STATE.INITED) {
+ this.clusterInfo = clusterInfo;
+ } else {
+ throw new IllegalStateException(
+ "ClusterInfo can only be set before the App is STARTED");
+ }
+ }
+
+ class TestJob extends JobImpl {
+ //override the init transition
+ private final TestInitTransition initTransition = new TestInitTransition(
+ maps, reduces);
+ StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
+ = stateMachineFactory.addTransition(JobStateInternal.NEW,
+ EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
+ JobEventType.JOB_INIT,
+ // This is abusive.
+ initTransition);
+
+ private final StateMachine<JobStateInternal, JobEventType, JobEvent>
+ localStateMachine;
+
+ @Override
+ protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
+ return localStateMachine;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
+ Configuration conf, EventHandler eventHandler,
+ TaskAttemptListener taskAttemptListener, Clock clock,
+ OutputCommitter committer, boolean newApiCommitter, String user,
+ TaskHeartbeatHandler thh, AppContext appContext) {
+ super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
+ conf, eventHandler, taskAttemptListener,
+ new JobTokenSecretManager(), new Credentials(), clock,
+ getCompletedTaskFromPreviousRun(), metrics, committer,
+ newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
+ thh, appContext);
+
+ // This "this leak" is okay because the retained pointer is in an
+ // instance variable.
+ localStateMachine = localFactory.make(this);
+ }
+ }
+
+ //Override InitTransition to not look for split files etc
+ static class TestInitTransition extends JobImpl.InitTransition {
+ private int maps;
+ private int reduces;
+ TestInitTransition(int maps, int reduces) {
+ this.maps = maps;
+ this.reduces = reduces;
+ }
+ @Override
+ protected void setup(JobImpl job) throws IOException {
+ super.setup(job);
+ job.conf.setInt(MRJobConfig.NUM_REDUCES, reduces);
+ job.remoteJobConfFile = new Path("test");
+ }
+ @Override
+ protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
+ TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[maps];
+ for (int i = 0; i < maps ; i++) {
+ splits[i] = new TaskSplitMetaInfo();
+ }
+ return splits;
+ }
+ }
+
+
+ private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId taskAttemptId,
+ TaskAttemptState finalState) {
+ TaskAttemptStatus tas = new TaskAttemptStatus();
+ tas.id = taskAttemptId;
+ tas.progress = 1.0f;
+ tas.phase = Phase.CLEANUP;
+ tas.stateString = finalState.name();
+ tas.taskState = finalState;
+ Counters counters = new Counters();
+ tas.counters = counters;
+ return tas;
+ }
+
+ private void sendStatusUpdate(TaskAttemptId taskAttemptId,
+ TaskAttemptState finalState) {
+ TaskAttemptStatus tas = createTaskAttemptStatus(taskAttemptId, finalState);
+ getContext().getEventHandler().handle(
+ new TaskAttemptEventStatusUpdate(taskAttemptId, tas));
+ }
+
+ /*
+ * Helper method to move a task attempt into a final state.
+ */
+ // TODO maybe rename to something like succeedTaskAttempt
+ public void sendFinishToTaskAttempt(TaskAttemptId taskAttemptId,
+ TaskAttemptState finalState, boolean sendStatusUpdate) throws Exception {
+ if (sendStatusUpdate) {
+ sendStatusUpdate(taskAttemptId, finalState);
+ }
+ if (finalState == TaskAttemptState.SUCCEEDED) {
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(taskAttemptId,
+ TaskAttemptEventType.TA_DONE));
+ } else if (finalState == TaskAttemptState.KILLED) {
+ getContext().getEventHandler()
+ .handle(new TaskAttemptEventKillRequest(taskAttemptId,
+ "Kill requested"));
+ } else if (finalState == TaskAttemptState.FAILED) {
+ getContext().getEventHandler().handle(
+ new TaskAttemptEventFailRequest(taskAttemptId, null));
+ }
+ }
+}
+
Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,336 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventAssignTA;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunchRequest;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+public class MRAppBenchmark {
+
+ /**
+ * Runs memory and time benchmark with Mock MRApp.
+ */
+ public void run(MRApp app) throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.WARN);
+ long startTime = System.currentTimeMillis();
+ Job job = app.submit(new Configuration());
+ while (!job.getReport().getJobState().equals(JobState.SUCCEEDED)) {
+ printStat(job, startTime);
+ Thread.sleep(2000);
+ }
+ printStat(job, startTime);
+ }
+
+ private void printStat(Job job, long startTime) throws Exception {
+ long currentTime = System.currentTimeMillis();
+ Runtime.getRuntime().gc();
+ long mem = Runtime.getRuntime().totalMemory()
+ - Runtime.getRuntime().freeMemory();
+ System.out.println("JobState:" + job.getState() +
+ " CompletedMaps:" + job.getCompletedMaps() +
+ " CompletedReduces:" + job.getCompletedReduces() +
+ " Memory(total-free)(KB):" + mem/1024 +
+ " ElapsedTime(ms):" + (currentTime - startTime));
+ }
+
+ //Throttles the maximum number of concurrent running tasks.
+ //This affects the memory requirement since
+ //org.apache.hadoop.mapred.MapTask/ReduceTask is loaded in memory for all
+ //running task and discarded once the task is launched.
+ static class ThrottledMRApp extends MRApp {
+
+ int maxConcurrentRunningTasks;
+ volatile int concurrentRunningTasks;
+ ThrottledMRApp(int maps, int reduces, int maxConcurrentRunningTasks) {
+ super(maps, reduces, true, "ThrottledMRApp", true);
+ this.maxConcurrentRunningTasks = maxConcurrentRunningTasks;
+ }
+
+ @Override
+ protected void attemptLaunched(TaskAttemptId attemptID) {
+ super.attemptLaunched(attemptID);
+ //the task is launched and sends done immediately
+ concurrentRunningTasks--;
+ }
+
+ @Override
+ protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
+ AppContext appContext) {
+ return new ThrottledContainerAllocator();
+ }
+
+ class ThrottledContainerAllocator extends AbstractService
+ implements ContainerAllocator {
+ private int containerCount;
+ private Thread thread;
+ private BlockingQueue<AMSchedulerEvent> eventQueue =
+ new LinkedBlockingQueue<AMSchedulerEvent>();
+ public ThrottledContainerAllocator() {
+ super("ThrottledContainerAllocator");
+ }
+ @Override
+ public void handle(AMSchedulerEvent event) {
+ try {
+ eventQueue.put(event);
+ } catch (InterruptedException e) {
+ throw new YarnException(e);
+ }
+ }
+ @Override
+ public void start() {
+ thread = new Thread(new Runnable() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ AMSchedulerEvent event = null;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ if (concurrentRunningTasks < maxConcurrentRunningTasks) {
+ event = eventQueue.take();
+ switch(event.getType()) {
+ case S_TA_LAUNCH_REQUEST:
+ AMSchedulerTALaunchRequestEvent lEvent = (AMSchedulerTALaunchRequestEvent)event;
+ ContainerId cId = Records.newRecord(ContainerId.class);
+ cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
+ cId.setId(containerCount++);
+ NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT);
+ Container container = BuilderUtils.newContainer(cId, nodeId,
+ NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
+
+ getContext().getAllContainers().addContainerIfNew(container);
+ getContext().getAllNodes().nodeSeen(nodeId);
+
+ JobID id = TypeConverter.fromYarn(getContext().getApplicationID());
+ JobId jobId = TypeConverter.toYarn(id);
+
+ attemptToContainerIdMap.put(lEvent.getAttemptID(), cId);
+ if (getContext().getAllContainers().get(cId).getState() == AMContainerState.ALLOCATED) {
+
+ AMContainerEventLaunchRequest lrEvent = new AMContainerEventLaunchRequest(
+ cId, jobId, lEvent.getAttemptID().getTaskId().getTaskType(),
+ lEvent.getJobToken(), lEvent.getCredentials(), false,
+ new JobConf(getContext().getJob(jobId).getConf()));
+ getContext().getEventHandler().handle(lrEvent);
+ }
+
+ getContext().getEventHandler().handle(
+ new AMContainerEventAssignTA(cId, lEvent.getAttemptID(), lEvent
+ .getRemoteTaskContext()));
+ concurrentRunningTasks++;
+ break;
+
+ case S_TA_ENDED:
+ // Send out a Container_stop_request.
+ AMSchedulerEventTAEnded sEvent = (AMSchedulerEventTAEnded) event;
+ switch (sEvent.getState()) {
+ case FAILED:
+ case KILLED:
+ getContext().getEventHandler().handle(
+ new AMContainerEvent(attemptToContainerIdMap.remove(sEvent
+ .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+ break;
+ case SUCCEEDED:
+ // No re-use in MRApp. Stop the container.
+ getContext().getEventHandler().handle(
+ new AMContainerEvent(attemptToContainerIdMap.remove(sEvent
+ .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+ break;
+ default:
+ throw new YarnException("Unexpected state: " + sEvent.getState());
+ }
+ case S_CONTAINERS_ALLOCATED:
+ break;
+ case S_CONTAINER_COMPLETED:
+ break;
+ default:
+ break;
+ }
+ } else {
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+ System.out.println("Returning, interrupted");
+ return;
+ }
+ }
+ }
+ });
+ thread.start();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ thread.interrupt();
+ super.stop();
+ }
+ }
+ }
+
+ @Test
+ public void benchmark1() throws Exception {
+ int maps = 100; // Adjust for benchmarking. Start with thousands.
+ int reduces = 0;
+ System.out.println("Running benchmark with maps:"+maps +
+ " reduces:"+reduces);
+ run(new MRApp(maps, reduces, true, this.getClass().getName(), true) {
+
+ @Override
+ protected ContainerAllocator createAMScheduler(
+ ContainerRequestor requestor, AppContext appContext) {
+ return new RMContainerAllocator((RMContainerRequestor) requestor,
+ appContext);
+ }
+
+ @Override
+ protected ContainerRequestor createContainerRequestor(
+ ClientService clientService, AppContext appContext) {
+ return new RMContainerRequestor(clientService, appContext) {
+ @Override
+ protected AMRMProtocol createSchedulerProxy() {
+ return new AMRMProtocol() {
+
+ @Override
+ public RegisterApplicationMasterResponse
+ registerApplicationMaster(
+ RegisterApplicationMasterRequest request)
+ throws YarnRemoteException {
+ RegisterApplicationMasterResponse response =
+ Records.newRecord(RegisterApplicationMasterResponse.class);
+ response.setMinimumResourceCapability(BuilderUtils
+ .newResource(1024, 1));
+ response.setMaximumResourceCapability(BuilderUtils
+ .newResource(10240, 1));
+ return response;
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request)
+ throws YarnRemoteException {
+ FinishApplicationMasterResponse response =
+ Records.newRecord(FinishApplicationMasterResponse.class);
+ return response;
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnRemoteException {
+
+ AllocateResponse response =
+ Records.newRecord(AllocateResponse.class);
+ List<ResourceRequest> askList = request.getAskList();
+ List<Container> containers = new ArrayList<Container>();
+ for (ResourceRequest req : askList) {
+ if (req.getHostName() != "*") {
+ continue;
+ }
+ int numContainers = req.getNumContainers();
+ for (int i = 0; i < numContainers; i++) {
+ ContainerId containerId =
+ BuilderUtils.newContainerId(
+ request.getApplicationAttemptId(),
+ request.getResponseId() + i);
+ containers.add(BuilderUtils
+ .newContainer(containerId, BuilderUtils.newNodeId("host"
+ + containerId.getId(), 2345),
+ "host" + containerId.getId() + ":5678", req
+ .getCapability(), req.getPriority(), null));
+ }
+ }
+
+ AMResponse amResponse = Records.newRecord(AMResponse.class);
+ amResponse.setAllocatedContainers(containers);
+ amResponse.setResponseId(request.getResponseId() + 1);
+ response.setAMResponse(amResponse);
+ response.setNumClusterNodes(350);
+ return response;
+ }
+ };
+ }
+ };
+ }
+ });
+ }
+
+ @Test
+ public void benchmark2() throws Exception {
+ int maps = 100; // Adjust for benchmarking, start with a couple of thousands
+ int reduces = 50;
+ int maxConcurrentRunningTasks = 500;
+
+ System.out.println("Running benchmark with throttled running tasks with " +
+ "maxConcurrentRunningTasks:" + maxConcurrentRunningTasks +
+ " maps:" + maps + " reduces:" + reduces);
+ run(new ThrottledMRApp(maps, reduces, maxConcurrentRunningTasks));
+ }
+
+ public static void main(String[] args) throws Exception {
+ MRAppBenchmark benchmark = new MRAppBenchmark();
+ benchmark.benchmark1();
+ benchmark.benchmark2();
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MockJobs.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MockJobs.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MockJobs.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MockJobs.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,636 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobACLsManager;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class MockJobs extends MockApps {
+ static final Iterator<JobState> JOB_STATES = Iterators.cycle(JobState
+ .values());
+ static final Iterator<TaskState> TASK_STATES = Iterators.cycle(TaskState
+ .values());
+ static final Iterator<TaskAttemptState> TASK_ATTEMPT_STATES = Iterators
+ .cycle(TaskAttemptState.values());
+ static final Iterator<TaskType> TASK_TYPES = Iterators.cycle(TaskType
+ .values());
+ static final Iterator<JobCounter> JOB_COUNTERS = Iterators.cycle(JobCounter
+ .values());
+ static final Iterator<FileSystemCounter> FS_COUNTERS = Iterators
+ .cycle(FileSystemCounter.values());
+ static final Iterator<TaskCounter> TASK_COUNTERS = Iterators
+ .cycle(TaskCounter.values());
+ static final Iterator<String> FS_SCHEMES = Iterators.cycle("FILE", "HDFS",
+ "LAFS", "CEPH");
+ static final Iterator<String> USER_COUNTER_GROUPS = Iterators
+ .cycle(
+ "com.company.project.subproject.component.subcomponent.UserDefinedSpecificSpecialTask$Counters",
+ "PigCounters");
+ static final Iterator<String> USER_COUNTERS = Iterators.cycle("counter1",
+ "counter2", "counter3");
+ static final Iterator<Phase> PHASES = Iterators.cycle(Phase.values());
+ static final Iterator<String> DIAGS = Iterators.cycle(
+ "Error: java.lang.OutOfMemoryError: Java heap space",
+ "Lost task tracker: tasktracker.domain/127.0.0.1:40879");
+
+ public static final String NM_HOST = "localhost";
+ public static final int NM_PORT = 1234;
+ public static final int NM_HTTP_PORT = 8042;
+
+ static final int DT = 1000000; // ms
+
+ public static String newJobName() {
+ return newAppName();
+ }
+
+ /**
+ * Create numJobs in a map with jobs having appId==jobId
+ */
+ public static Map<JobId, Job> newJobs(int numJobs, int numTasksPerJob,
+ int numAttemptsPerTask) {
+ Map<JobId, Job> map = Maps.newHashMap();
+ for (int j = 0; j < numJobs; ++j) {
+ ApplicationId appID = MockJobs.newAppID(j);
+ Job job = newJob(appID, j, numTasksPerJob, numAttemptsPerTask);
+ map.put(job.getID(), job);
+ }
+ return map;
+ }
+
+ public static Map<JobId, Job> newJobs(ApplicationId appID, int numJobsPerApp,
+ int numTasksPerJob, int numAttemptsPerTask) {
+ Map<JobId, Job> map = Maps.newHashMap();
+ for (int j = 0; j < numJobsPerApp; ++j) {
+ Job job = newJob(appID, j, numTasksPerJob, numAttemptsPerTask);
+ map.put(job.getID(), job);
+ }
+ return map;
+ }
+
+ public static Map<JobId, Job> newJobs(ApplicationId appID, int numJobsPerApp,
+ int numTasksPerJob, int numAttemptsPerTask, boolean hasFailedTasks) {
+ Map<JobId, Job> map = Maps.newHashMap();
+ for (int j = 0; j < numJobsPerApp; ++j) {
+ Job job = newJob(appID, j, numTasksPerJob, numAttemptsPerTask, null,
+ hasFailedTasks);
+ map.put(job.getID(), job);
+ }
+ return map;
+ }
+
+ public static JobId newJobID(ApplicationId appID, int i) {
+ JobId id = Records.newRecord(JobId.class);
+ id.setAppId(appID);
+ id.setId(i);
+ return id;
+ }
+
+ public static JobReport newJobReport(JobId id) {
+ JobReport report = Records.newRecord(JobReport.class);
+ report.setJobId(id);
+ report
+ .setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
+ report.setFinishTime(System.currentTimeMillis()
+ + (int) (Math.random() * DT) + 1);
+ report.setMapProgress((float) Math.random());
+ report.setReduceProgress((float) Math.random());
+ report.setJobState(JOB_STATES.next());
+ return report;
+ }
+
+ public static TaskReport newTaskReport(TaskId id) {
+ TaskReport report = Records.newRecord(TaskReport.class);
+ report.setTaskId(id);
+ report
+ .setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
+ report.setFinishTime(System.currentTimeMillis()
+ + (int) (Math.random() * DT) + 1);
+ report.setProgress((float) Math.random());
+ report.setCounters(TypeConverter.toYarn(newCounters()));
+ report.setTaskState(TASK_STATES.next());
+ return report;
+ }
+
+ public static TaskAttemptReport newTaskAttemptReport(TaskAttemptId id) {
+ TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
+ report.setTaskAttemptId(id);
+ report
+ .setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
+ report.setFinishTime(System.currentTimeMillis()
+ + (int) (Math.random() * DT) + 1);
+ report.setPhase(PHASES.next());
+ report.setTaskAttemptState(TASK_ATTEMPT_STATES.next());
+ report.setProgress((float) Math.random());
+ report.setCounters(TypeConverter.toYarn(newCounters()));
+ return report;
+ }
+
+ public static Counters newCounters() {
+ Counters hc = new Counters();
+ for (JobCounter c : JobCounter.values()) {
+ hc.findCounter(c).setValue((long) (Math.random() * 1000));
+ }
+ for (TaskCounter c : TaskCounter.values()) {
+ hc.findCounter(c).setValue((long) (Math.random() * 1000));
+ }
+ int nc = FileSystemCounter.values().length * 4;
+ for (int i = 0; i < nc; ++i) {
+ for (FileSystemCounter c : FileSystemCounter.values()) {
+ hc.findCounter(FS_SCHEMES.next(), c).setValue(
+ (long) (Math.random() * DT));
+ }
+ }
+ for (int i = 0; i < 2 * 3; ++i) {
+ hc.findCounter(USER_COUNTER_GROUPS.next(), USER_COUNTERS.next())
+ .setValue((long) (Math.random() * 100000));
+ }
+ return hc;
+ }
+
+ public static Map<TaskAttemptId, TaskAttempt> newTaskAttempts(TaskId tid,
+ int m) {
+ Map<TaskAttemptId, TaskAttempt> map = Maps.newHashMap();
+ for (int i = 0; i < m; ++i) {
+ TaskAttempt ta = newTaskAttempt(tid, i);
+ map.put(ta.getID(), ta);
+ }
+ return map;
+ }
+
+ public static TaskAttempt newTaskAttempt(TaskId tid, int i) {
+ final TaskAttemptId taid = Records.newRecord(TaskAttemptId.class);
+ taid.setTaskId(tid);
+ taid.setId(i);
+ final TaskAttemptReport report = newTaskAttemptReport(taid);
+ final List<String> diags = Lists.newArrayList();
+ diags.add(DIAGS.next());
+ return new TaskAttempt() {
+ @Override
+ public NodeId getNodeId() throws UnsupportedOperationException{
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TaskAttemptId getID() {
+ return taid;
+ }
+
+ @Override
+ public TaskAttemptReport getReport() {
+ return report;
+ }
+
+ @Override
+ public long getLaunchTime() {
+ return 0;
+ }
+
+ @Override
+ public long getFinishTime() {
+ return 0;
+ }
+
+ @Override
+ public int getShufflePort() {
+ return ShuffleHandler.DEFAULT_SHUFFLE_PORT;
+ }
+
+ @Override
+ public Counters getCounters() {
+ if (report != null && report.getCounters() != null) {
+ return new Counters(TypeConverter.fromYarn(report.getCounters()));
+ }
+ return null;
+ }
+
+ @Override
+ public float getProgress() {
+ return report.getProgress();
+ }
+
+ @Override
+ public TaskAttemptState getState() {
+ return report.getTaskAttemptState();
+ }
+
+ @Override
+ public boolean isFinished() {
+ switch (report.getTaskAttemptState()) {
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public ContainerId getAssignedContainerID() {
+ ContainerId id = Records.newRecord(ContainerId.class);
+ ApplicationAttemptId appAttemptId = Records
+ .newRecord(ApplicationAttemptId.class);
+ appAttemptId.setApplicationId(taid.getTaskId().getJobId().getAppId());
+ appAttemptId.setAttemptId(0);
+ id.setApplicationAttemptId(appAttemptId);
+ return id;
+ }
+
+ @Override
+ public String getNodeHttpAddress() {
+ return "localhost:8042";
+ }
+
+ @Override
+ public List<String> getDiagnostics() {
+ return diags;
+ }
+
+ @Override
+ public String getAssignedContainerMgrAddress() {
+ return "localhost:9998";
+ }
+
+ @Override
+ public long getShuffleFinishTime() {
+ return 0;
+ }
+
+ @Override
+ public long getSortFinishTime() {
+ return 0;
+ }
+
+ @Override
+ public String getNodeRackName() {
+ return "/default-rack";
+ }
+ };
+ }
+
+ public static Map<TaskId, Task> newTasks(JobId jid, int n, int m, boolean hasFailedTasks) {
+ Map<TaskId, Task> map = Maps.newHashMap();
+ for (int i = 0; i < n; ++i) {
+ Task task = newTask(jid, i, m, hasFailedTasks);
+ map.put(task.getID(), task);
+ }
+ return map;
+ }
+
+ public static Task newTask(JobId jid, int i, int m, final boolean hasFailedTasks) {
+ final TaskId tid = Records.newRecord(TaskId.class);
+ tid.setJobId(jid);
+ tid.setId(i);
+ tid.setTaskType(TASK_TYPES.next());
+ final TaskReport report = newTaskReport(tid);
+ final Map<TaskAttemptId, TaskAttempt> attempts = newTaskAttempts(tid, m);
+ return new Task() {
+ @Override
+ public TaskId getID() {
+ return tid;
+ }
+
+ @Override
+ public TaskReport getReport() {
+ return report;
+ }
+
+ @Override
+ public Counters getCounters() {
+ if (hasFailedTasks) {
+ return null;
+ }
+ return new Counters(
+ TypeConverter.fromYarn(report.getCounters()));
+ }
+
+ @Override
+ public float getProgress() {
+ return report.getProgress();
+ }
+
+ @Override
+ public TaskType getType() {
+ return tid.getTaskType();
+ }
+
+ @Override
+ public Map<TaskAttemptId, TaskAttempt> getAttempts() {
+ return attempts;
+ }
+
+ @Override
+ public TaskAttempt getAttempt(TaskAttemptId attemptID) {
+ return attempts.get(attemptID);
+ }
+
+ @Override
+ public boolean isFinished() {
+ switch (report.getTaskState()) {
+ case SUCCEEDED:
+ case KILLED:
+ case FAILED:
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean canCommit(TaskAttemptId taskAttemptID) {
+ return false;
+ }
+
+ @Override
+ public TaskState getState() {
+ return report.getTaskState();
+ }
+
+ @Override
+ public boolean needsWaitAfterOutputConsumable() {
+ return false;
+ }
+
+ @Override
+ public TaskAttemptId getOutputConsumableAttempt() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ };
+ }
+
+ public static Counters getCounters(
+ Collection<Task> tasks) {
+ List<Task> completedTasks = new ArrayList<Task>();
+ for (Task task : tasks) {
+ if (task.getCounters() != null) {
+ completedTasks.add(task);
+ }
+ }
+ Counters counters = new Counters();
+ return JobImpl.incrTaskCounters(counters, completedTasks);
+ }
+
+ static class TaskCount {
+ int maps;
+ int reduces;
+ int completedMaps;
+ int completedReduces;
+
+ void incr(Task task) {
+ TaskType type = task.getType();
+ boolean finished = task.isFinished();
+ if (type == TaskType.MAP) {
+ if (finished) {
+ ++completedMaps;
+ }
+ ++maps;
+ } else if (type == TaskType.REDUCE) {
+ if (finished) {
+ ++completedReduces;
+ }
+ ++reduces;
+ }
+ }
+ }
+
+ static TaskCount getTaskCount(Collection<Task> tasks) {
+ TaskCount tc = new TaskCount();
+ for (Task task : tasks) {
+ tc.incr(task);
+ }
+ return tc;
+ }
+
+ public static Job newJob(ApplicationId appID, int i, int n, int m) {
+ return newJob(appID, i, n, m, null);
+ }
+
+ public static Job newJob(ApplicationId appID, int i, int n, int m, Path confFile) {
+ return newJob(appID, i, n, m, confFile, false);
+ }
+
+ public static Job newJob(ApplicationId appID, int i, int n, int m,
+ Path confFile, boolean hasFailedTasks) {
+ final JobId id = newJobID(appID, i);
+ final String name = newJobName();
+ final JobReport report = newJobReport(id);
+ final Map<TaskId, Task> tasks = newTasks(id, n, m, hasFailedTasks);
+ final TaskCount taskCount = getTaskCount(tasks.values());
+ final Counters counters = getCounters(tasks
+ .values());
+ final Path configFile = confFile;
+
+ Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>();
+ final Configuration conf = new Configuration();
+ conf.set(JobACL.VIEW_JOB.getAclName(), "testuser");
+ conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+
+ JobACLsManager aclsManager = new JobACLsManager(conf);
+ tmpJobACLs = aclsManager.constructJobACLs(conf);
+ final Map<JobACL, AccessControlList> jobACLs = tmpJobACLs;
+ return new Job() {
+ @Override
+ public JobId getID() {
+ return id;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public JobState getState() {
+ return report.getJobState();
+ }
+
+ @Override
+ public JobReport getReport() {
+ return report;
+ }
+
+ @Override
+ public float getProgress() {
+ return 0;
+ }
+
+ @Override
+ public Counters getAllCounters() {
+ return counters;
+ }
+
+ @Override
+ public Map<TaskId, Task> getTasks() {
+ return tasks;
+ }
+
+ @Override
+ public Task getTask(TaskId taskID) {
+ return tasks.get(taskID);
+ }
+
+ @Override
+ public int getTotalMaps() {
+ return taskCount.maps;
+ }
+
+ @Override
+ public int getTotalReduces() {
+ return taskCount.reduces;
+ }
+
+ @Override
+ public int getCompletedMaps() {
+ return taskCount.completedMaps;
+ }
+
+ @Override
+ public int getCompletedReduces() {
+ return taskCount.completedReduces;
+ }
+
+ @Override
+ public boolean isUber() {
+ return false;
+ }
+
+ @Override
+ public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
+ int fromEventId, int maxEvents) {
+ return null;
+ }
+
+ @Override
+ public Map<TaskId, Task> getTasks(TaskType taskType) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public List<String> getDiagnostics() {
+ return Collections.<String> emptyList();
+ }
+
+ @Override
+ public boolean checkAccess(UserGroupInformation callerUGI,
+ JobACL jobOperation) {
+ return true;
+ }
+
+ @Override
+ public String getUserName() {
+ return "mock";
+ }
+
+ @Override
+ public String getQueueName() {
+ return "mockqueue";
+ }
+
+ @Override
+ public Path getConfFile() {
+ return configFile;
+ }
+
+ @Override
+ public Map<JobACL, AccessControlList> getJobACLs() {
+ return jobACLs;
+ }
+
+ @Override
+ public List<AMInfo> getAMInfos() {
+ List<AMInfo> amInfoList = new LinkedList<AMInfo>();
+ amInfoList.add(createAMInfo(1));
+ amInfoList.add(createAMInfo(2));
+ return amInfoList;
+ }
+
+ @Override
+ public Configuration loadConfFile() throws IOException {
+ FileContext fc = FileContext.getFileContext(configFile.toUri(), conf);
+ Configuration jobConf = new Configuration(false);
+ jobConf.addResource(fc.open(configFile), configFile.toString());
+ return jobConf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ };
+ }
+
+ private static AMInfo createAMInfo(int attempt) {
+ ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+ BuilderUtils.newApplicationId(100, 1), attempt);
+ ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
+ return MRBuilderUtils.newAMInfo(appAttemptId, System.currentTimeMillis(),
+ containerId, NM_HOST, NM_PORT, NM_HTTP_PORT);
+ }
+}
Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,333 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttemptStateInternal;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.tez.mapreduce.hadoop.TaskAttemptListenerImplTez;
+import org.junit.Test;
+
+/**
+ * Tests the state machine with respect to Job/Task/TaskAttempt failure
+ * scenarios.
+ */
+@SuppressWarnings("unchecked")
+public class TestFail {
+
+ @Test
+ //First attempt is failed and second attempt is passed
+ //The job succeeds.
+ public void testFailTask() throws Exception {
+ MRApp app = new MockFirstFailingAttemptMRApp(1, 0);
+ Configuration conf = new Configuration();
+ // this test requires two task attempts, but uberization overrides max to 1
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.SUCCEEDED);
+ Map<TaskId,Task> tasks = job.getTasks();
+ Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+ Task task = tasks.values().iterator().next();
+ Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED,
+ task.getReport().getTaskState());
+ Map<TaskAttemptId, TaskAttempt> attempts =
+ tasks.values().iterator().next().getAttempts();
+ Assert.assertEquals("Num attempts is not correct", 2, attempts.size());
+ //one attempt must be failed
+ //and another must have succeeded
+ Iterator<TaskAttempt> it = attempts.values().iterator();
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+ it.next().getReport().getTaskAttemptState());
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED,
+ it.next().getReport().getTaskAttemptState());
+ }
+
+ @Test
+ public void testMapFailureMaxPercent() throws Exception {
+ MRApp app = new MockFirstFailingTaskMRApp(4, 0);
+ Configuration conf = new Configuration();
+
+ //reduce the no of attempts so test run faster
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
+ conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+
+ conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 20);
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.FAILED);
+
+ //setting the failure percentage to 25% (1/4 is 25) will
+ //make the Job successful
+ app = new MockFirstFailingTaskMRApp(4, 0);
+ conf = new Configuration();
+
+ //reduce the no of attempts so test run faster
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
+ conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+
+ conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 25);
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.SUCCEEDED);
+ }
+
+ @Test
+ public void testReduceFailureMaxPercent() throws Exception {
+ MRApp app = new MockFirstFailingTaskMRApp(2, 4);
+ Configuration conf = new Configuration();
+
+ //reduce the no of attempts so test run faster
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+ conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 2);
+
+ conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 50);//no failure due to Map
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+ conf.setInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 20);
+ conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.FAILED);
+
+ //setting the failure percentage to 25% (1/4 is 25) will
+ //make the Job successful
+ app = new MockFirstFailingTaskMRApp(2, 4);
+ conf = new Configuration();
+
+ //reduce the no of attempts so test run faster
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+ conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 2);
+
+ conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 50);//no failure due to Map
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+ conf.setInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 25);
+ conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.SUCCEEDED);
+ }
+
+ @Test
+ //All Task attempts are timed out, leading to Job failure
+ public void testTimedOutTask() throws Exception {
+ MRApp app = new TimeOutTaskMRApp(1, 0);
+ Configuration conf = new Configuration();
+ int maxAttempts = 2;
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
+ // disable uberization (requires entire job to be reattempted, so max for
+ // subtask attempts is overridden to 1)
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.FAILED);
+ Map<TaskId,Task> tasks = job.getTasks();
+ Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+ Task task = tasks.values().iterator().next();
+ Assert.assertEquals("Task state not correct", TaskState.FAILED,
+ task.getReport().getTaskState());
+ Map<TaskAttemptId, TaskAttempt> attempts =
+ tasks.values().iterator().next().getAttempts();
+ Assert.assertEquals("Num attempts is not correct", maxAttempts,
+ attempts.size());
+ for (TaskAttempt attempt : attempts.values()) {
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+ attempt.getReport().getTaskAttemptState());
+ }
+ }
+
+ @Test
+ public void testTaskFailWithUnusedContainer() throws Exception {
+ MRApp app = new MRAppWithFailingTaskAndUnusedContainer();
+ Configuration conf = new Configuration();
+ int maxAttempts = 1;
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
+ // disable uberization (requires entire job to be reattempted, so max for
+ // subtask attempts is overridden to 1)
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Map<TaskId, Task> tasks = job.getTasks();
+ Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+ Task task = tasks.values().iterator().next();
+ app.waitForState(task, TaskState.SCHEDULED);
+ Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator()
+ .next().getAttempts();
+ Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
+ .size());
+ TaskAttempt attempt = attempts.values().iterator().next();
+ app.waitForInternalState((TaskAttemptImpl) attempt,
+ TaskAttemptStateInternal.START_WAIT);
+ // TODO XXX: This may not be a valid test.
+ app.getDispatcher().getEventHandler().handle(
+ new TaskAttemptEvent(attempt.getID(),
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED));
+ app.waitForState(job, JobState.FAILED);
+ }
+
+ static class MRAppWithFailingTaskAndUnusedContainer extends MRApp {
+
+ public MRAppWithFailingTaskAndUnusedContainer() {
+ super(1, 0, false, "TaskFailWithUnsedContainer", true);
+ }
+
+ @Override
+ protected ContainerLauncher createContainerLauncher(AppContext context) {
+ return new ContainerLauncherImpl(context) {
+ @Override
+ public void handle(NMCommunicatorEvent event) {
+
+ switch (event.getType()) {
+ case CONTAINER_LAUNCH_REQUEST:
+ super.handle(event); // Unused event and container.
+ break;
+ case CONTAINER_STOP_REQUEST:
+// getContext().getEventHandler().handle(
+// new TaskAttemptEvent(event.getTaskAttemptID(),
+// TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ // TODO XXX: May need a CONTAINER_COMPLETED event to go out.
+ break;
+ }
+ }
+
+ @Override
+ protected ContainerManager getCMProxy(ContainerId contianerID,
+ String containerManagerBindAddr, ContainerToken containerToken)
+ throws IOException {
+ try {
+ synchronized (this) {
+ wait(); // Just hang the thread simulating a very slow NM.
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+ };
+ };
+ }
+
+ static class TimeOutTaskMRApp extends MRApp {
+ TimeOutTaskMRApp(int maps, int reduces) {
+ super(maps, reduces, false, "TimeOutTaskMRApp", true);
+ }
+ @Override
+ protected TaskAttemptListener createTaskAttemptListener(AppContext context,
+ TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
+ return new TaskAttemptListenerImplTez(getContext(), thh, chh, null) {
+ @Override
+ public void startRpcServer(){};
+ @Override
+ public void stopRpcServer(){};
+ @Override
+ public InetSocketAddress getAddress() {
+ return NetUtils.createSocketAddr("localhost", 1234);
+ }
+ };
+ }
+
+ protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
+ Configuration conf) {
+ // Creates a TaskHeartbeatHandler with a low timeout value. THH will
+ // send out a lost event leading to attempt failure.
+ return new TaskHeartbeatHandler(getContext(), 1) {
+ @Override
+ public void init(Configuration conf) {
+ conf.setInt(MRJobConfig.TASK_TIMEOUT, 1*1000);//reduce timeout
+ conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1*1000);
+ super.init(conf);
+ }
+ };
+ }
+ }
+
+ //Attempts of first Task are failed
+ static class MockFirstFailingTaskMRApp extends MRApp {
+
+ MockFirstFailingTaskMRApp(int maps, int reduces) {
+ super(maps, reduces, true, "MockFirstFailingTaskMRApp", true);
+ }
+
+ @Override
+ protected void attemptLaunched(TaskAttemptId attemptID) {
+ if (attemptID.getTaskId().getId() == 0) {//check if it is first task
+ // send the Fail event
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILED));
+ // TODO XXX: Was FAIL_MSG. Remove comment.
+ } else {
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_DONE));
+ }
+ }
+ }
+
+ //First attempt is failed
+ static class MockFirstFailingAttemptMRApp extends MRApp {
+ MockFirstFailingAttemptMRApp(int maps, int reduces) {
+ super(maps, reduces, true, "MockFirstFailingAttemptMRApp", true);
+ }
+
+ @Override
+ protected void attemptLaunched(TaskAttemptId attemptID) {
+ if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
+ //check if it is first task's first attempt
+ // send the Fail event
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILED));
+ // TODO XXX: Was FAIL_MSG. Remove comment.
+ } else {
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_DONE));
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TestFail t = new TestFail();
+ t.testFailTask();
+ t.testTimedOutTask();
+ t.testMapFailureMaxPercent();
+ t.testReduceFailureMaxPercent();
+ t.testTaskFailWithUnusedContainer();
+ }
+}