You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/12/27 20:54:52 UTC
svn commit: r1224995 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apac...
Author: vinodkv
Date: Tue Dec 27 19:54:51 2011
New Revision: 1224995
URL: http://svn.apache.org/viewvc?rev=1224995&view=rev
Log:
MAPREDUCE-3568. Optimized Job's progress calculations in MR AM. (vinodkv)
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1224995&r1=1224994&r2=1224995&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Dec 27 19:54:51 2011
@@ -179,6 +179,8 @@ Release 0.23.1 - Unreleased
immediately after downloading a resource instead of always waiting for a
second. (Siddarth Seth via vinodkv)
+ MAPREDUCE-3568. Optimized Job's progress calculations in MR AM. (vinodkv)
+
BUG FIXES
MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1224995&r1=1224994&r2=1224995&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Dec 27 19:54:51 2011
@@ -228,7 +228,7 @@ public class MRAppMaster extends Composi
+ recoveryEnabled + " recoverySupportedByCommitter: "
+ recoverySupportedByCommitter + " ApplicationAttemptID: "
+ appAttemptID.getAttemptId());
- dispatcher = new AsyncDispatcher();
+ dispatcher = createDispatcher();
addIfService(dispatcher);
}
@@ -291,6 +291,10 @@ public class MRAppMaster extends Composi
super.init(conf);
} // end of init()
+ protected Dispatcher createDispatcher() {
+ return new AsyncDispatcher();
+ }
+
private OutputCommitter createOutputCommitter(Configuration conf) {
OutputCommitter committer = null;
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java?rev=1224995&r1=1224994&r2=1224995&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java Tue Dec 27 19:54:51 2011
@@ -53,6 +53,7 @@ public interface Job {
int getTotalReduces();
int getCompletedMaps();
int getCompletedReduces();
+ float getProgress();
boolean isUber();
String getUserName();
String getQueueName();
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1224995&r1=1224994&r2=1224995&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Dec 27 19:54:51 2011
@@ -128,6 +128,10 @@ public class JobImpl implements org.apac
private final String username;
private final OutputCommitter committer;
private final Map<JobACL, AccessControlList> jobACLs;
+ private float setupWeight = 0.05f;
+ private float cleanupWeight = 0.05f;
+ private float mapWeight = 0.0f;
+ private float reduceWeight = 0.0f;
private final Set<TaskId> completedTasksFromPreviousRun;
private final List<AMInfo> amInfos;
private final Lock readLock;
@@ -147,7 +151,7 @@ public class JobImpl implements org.apac
private final long appSubmitTime;
private boolean lazyTasksCopyNeeded = false;
- private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
+ volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
private Counters jobCounters = newCounters();
// FIXME:
//
@@ -353,6 +357,8 @@ public class JobImpl implements org.apac
private long startTime;
private long finishTime;
private float setupProgress;
+ private float mapProgress;
+ private float reduceProgress;
private float cleanupProgress;
private boolean isUber = false;
@@ -587,30 +593,51 @@ public class JobImpl implements org.apac
cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
}
+ computeProgress();
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress,
- computeProgress(mapTasks), computeProgress(reduceTasks),
+ this.mapProgress, this.reduceProgress,
cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
} finally {
readLock.unlock();
}
}
- private float computeProgress(Set<TaskId> taskIds) {
- readLock.lock();
+ @Override
+ public float getProgress() {
+ this.readLock.lock();
+ try {
+ computeProgress();
+ return (this.setupProgress * this.setupWeight + this.cleanupProgress
+ * this.cleanupWeight + this.mapProgress * this.mapWeight + this.reduceProgress
+ * this.reduceWeight);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ private void computeProgress() {
+ this.readLock.lock();
try {
- float progress = 0;
- for (TaskId taskId : taskIds) {
- Task task = tasks.get(taskId);
- progress += task.getProgress();
- }
- int taskIdsSize = taskIds.size();
- if (taskIdsSize != 0) {
- progress = progress/taskIdsSize;
+ float mapProgress = 0f;
+ float reduceProgress = 0f;
+ for (Task task : this.tasks.values()) {
+ if (task.getType() == TaskType.MAP) {
+ mapProgress += task.getProgress();
+ } else {
+ reduceProgress += task.getProgress();
+ }
+ }
+ if (this.numMapTasks != 0) {
+ mapProgress = mapProgress / this.numMapTasks;
}
- return progress;
+ if (this.numReduceTasks != 0) {
+ reduceProgress = reduceProgress / this.numReduceTasks;
+ }
+ this.mapProgress = mapProgress;
+ this.reduceProgress = reduceProgress;
} finally {
- readLock.unlock();
+ this.readLock.unlock();
}
}
@@ -731,7 +758,7 @@ public class JobImpl implements org.apac
static JobState checkJobCompleteSuccess(JobImpl job) {
// check for Job success
- if (job.completedTaskCount == job.getTasks().size()) {
+ if (job.completedTaskCount == job.tasks.size()) {
try {
// Commit job & do cleanup
job.getCommitter().commitJob(job.getJobContext());
@@ -970,6 +997,12 @@ public class JobImpl implements org.apac
if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
+ } else if (job.numMapTasks == 0) {
+ job.reduceWeight = 0.9f;
+ } else if (job.numReduceTasks == 0) {
+ job.mapWeight = 0.9f;
+ } else {
+ job.mapWeight = job.reduceWeight = 0.45f;
}
checkTaskLimits();
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1224995&r1=1224994&r2=1224995&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Dec 27 19:54:51 2011
@@ -376,7 +376,7 @@ public abstract class TaskImpl implement
try {
TaskAttempt bestAttempt = selectBestAttempt();
if (bestAttempt == null) {
- return 0;
+ return 0f;
}
return bestAttempt.getProgress();
} finally {
@@ -457,9 +457,10 @@ public abstract class TaskImpl implement
result = at; //The first time around
}
// calculate the best progress
- if (at.getProgress() > progress) {
+ float attemptProgress = at.getProgress();
+ if (attemptProgress > progress) {
result = at;
- progress = at.getProgress();
+ progress = attemptProgress;
}
}
return result;
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1224995&r1=1224994&r2=1224995&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Tue Dec 27 19:54:51 2011
@@ -128,25 +128,7 @@ public abstract class RMCommunicator ext
protected float getApplicationProgress() {
// For now just a single job. In future when we have a DAG, we need an
// aggregate progress.
- JobReport report = this.job.getReport();
- float setupWeight = 0.05f;
- float cleanupWeight = 0.05f;
- float mapWeight = 0.0f;
- float reduceWeight = 0.0f;
- int numMaps = this.job.getTotalMaps();
- int numReduces = this.job.getTotalReduces();
- if (numMaps == 0 && numReduces == 0) {
- } else if (numMaps == 0) {
- reduceWeight = 0.9f;
- } else if (numReduces == 0) {
- mapWeight = 0.9f;
- } else {
- mapWeight = reduceWeight = 0.45f;
- }
- return (report.getSetupProgress() * setupWeight
- + report.getCleanupProgress() * cleanupWeight
- + report.getMapProgress() * mapWeight + report.getReduceProgress()
- * reduceWeight);
+ return this.job.getProgress();
}
protected void register() {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1224995&r1=1224994&r2=1224995&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Tue Dec 27 19:54:51 2011
@@ -161,7 +161,7 @@ public abstract class RMContainerRequest
" finishedContainers=" +
response.getCompletedContainersStatuses().size() +
" resourcelimit=" + availableResources +
- "knownNMs=" + clusterNmCount);
+ " knownNMs=" + clusterNmCount);
ask.clear();
release.clear();
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1224995&r1=1224994&r2=1224995&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Dec 27 19:54:51 2011
@@ -115,7 +115,8 @@ public class MRApp extends MRAppMaster {
applicationId.setId(0);
}
- public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart) {
+ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+ boolean cleanOnStart) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
}
@@ -141,10 +142,17 @@ public class MRApp extends MRAppMaster {
return containerId;
}
- public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) {
- super(getApplicationAttemptId(applicationId, startCount), getContainerId(
- applicationId, startCount), NM_HOST, NM_PORT, NM_HTTP_PORT, System
+ this(getApplicationAttemptId(applicationId, startCount), getContainerId(
+ applicationId, startCount), maps, reduces, autoComplete, testName,
+ cleanOnStart, startCount);
+ }
+
+ public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
+ int maps, int reduces, boolean autoComplete, String testName,
+ boolean cleanOnStart, int startCount) {
+ super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, System
.currentTimeMillis());
this.testWorkDir = new File("target", testName);
testAbsPath = new Path(testWorkDir.getAbsolutePath());
@@ -205,9 +213,9 @@ public class MRApp extends MRAppMaster {
TaskReport report = task.getReport();
while (!finalState.equals(report.getTaskState()) &&
timeoutSecs++ < 20) {
- System.out.println("Task State is : " + report.getTaskState() +
- " Waiting for state : " + finalState +
- " progress : " + report.getProgress());
+ System.out.println("Task State for " + task.getID() + " is : "
+ + report.getTaskState() + " Waiting for state : " + finalState
+ + " progress : " + report.getProgress());
report = task.getReport();
Thread.sleep(500);
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1224995&r1=1224994&r2=1224995&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Tue Dec 27 19:54:51 2011
@@ -426,6 +426,11 @@ public class MockJobs extends MockApps {
}
@Override
+ public float getProgress() {
+ return 0;
+ }
+
+ @Override
public Counters getCounters() {
return counters;
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1224995&r1=1224994&r2=1224995&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Tue Dec 27 19:54:51 2011
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -36,15 +37,20 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
@@ -78,6 +84,7 @@ import org.apache.hadoop.yarn.util.Build
import org.junit.After;
import org.junit.Test;
+@SuppressWarnings("unchecked")
public class TestRMContainerAllocator {
static final Log LOG = LogFactory
@@ -338,98 +345,155 @@ public class TestRMContainerAllocator {
}
}
- private static class FakeJob extends JobImpl {
-
- public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf,
- int numMaps, int numReduces) {
- super(MRBuilderUtils.newJobId(appAttemptID.getApplicationId(), 0),
- appAttemptID, conf, null, null, null, null, null, null, null, null,
- true, null, System.currentTimeMillis(), null);
- this.jobId = getID();
- this.numMaps = numMaps;
- this.numReduces = numReduces;
- }
-
- private float setupProgress;
- private float mapProgress;
- private float reduceProgress;
- private float cleanupProgress;
- private final int numMaps;
- private final int numReduces;
- private JobId jobId;
-
- void setProgress(float setupProgress, float mapProgress,
- float reduceProgress, float cleanupProgress) {
- this.setupProgress = setupProgress;
- this.mapProgress = mapProgress;
- this.reduceProgress = reduceProgress;
- this.cleanupProgress = cleanupProgress;
- }
-
- @Override
- public int getTotalMaps() { return this.numMaps; }
- @Override
- public int getTotalReduces() { return this.numReduces;}
-
- @Override
- public JobReport getReport() {
- return MRBuilderUtils.newJobReport(this.jobId, "job", "user",
- JobState.RUNNING, 0, 0, 0, this.setupProgress, this.mapProgress,
- this.reduceProgress, this.cleanupProgress, "jobfile", null, false);
- }
- }
-
@Test
public void testReportedAppProgress() throws Exception {
LOG.info("Running testReportedAppProgress");
Configuration conf = new Configuration();
- MyResourceManager rm = new MyResourceManager(conf);
+ final MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher();
// Submit the application
- RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ RMApp rmApp = rm.submitApp(1024);
+ rmDispatcher.await();
- MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+ MockNM amNodeManager = rm.registerNode("amNM:1234", 21504);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rmDispatcher.await();
- ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rmDispatcher.await();
- FakeJob job = new FakeJob(appAttemptId, conf, 2, 2);
- MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
- appAttemptId, job);
+ MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(
+ appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) {
+ @Override
+ protected Dispatcher createDispatcher() {
+ return new DrainDispatcher();
+ }
+ protected ContainerAllocator createContainerAllocator(
+ ClientService clientService, AppContext context) {
+ return new MyContainerAllocator(rm, appAttemptId, context);
+ };
+ };
+
+ Assert.assertEquals(0.0, rmApp.getProgress(), 0.0);
+
+ mrApp.submit(conf);
+ Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next()
+ .getValue();
+
+ DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher();
+
+ MyContainerAllocator allocator = (MyContainerAllocator) mrApp
+ .getContainerAllocator();
+
+ mrApp.waitForState(job, JobState.RUNNING);
+
+ amDispatcher.await();
+ // Wait till all map-attempts request for containers
+ for (Task t : job.getTasks().values()) {
+ if (t.getType() == TaskType.MAP) {
+ mrApp.waitForState(t.getAttempts().values().iterator().next(),
+ TaskAttemptState.UNASSIGNED);
+ }
+ }
+ amDispatcher.await();
+
+ allocator.schedule();
+ rmDispatcher.await();
+ amNodeManager.nodeHeartbeat(true);
+ rmDispatcher.await();
+ allocator.schedule();
+ rmDispatcher.await();
+
+ // Wait for all map-tasks to be running
+ for (Task t : job.getTasks().values()) {
+ if (t.getType() == TaskType.MAP) {
+ mrApp.waitForState(t, TaskState.RUNNING);
+ }
+ }
allocator.schedule(); // Send heartbeat
- dispatcher.await();
- Assert.assertEquals(0.0, app.getProgress(), 0.0);
+ rmDispatcher.await();
+ Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
+ Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f);
+
+ // Finish off 1 map.
+ Iterator<Task> it = job.getTasks().values().iterator();
+ finishNextNTasks(mrApp, it, 1);
+ allocator.schedule();
+ rmDispatcher.await();
+ Assert.assertEquals(0.095f, job.getProgress(), 0.001f);
+ Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f);
- job.setProgress(100, 10, 0, 0);
+ // Finish off 7 more so that map-progress is 80%
+ finishNextNTasks(mrApp, it, 7);
allocator.schedule();
- dispatcher.await();
- Assert.assertEquals(9.5f, app.getProgress(), 0.0);
+ rmDispatcher.await();
+ Assert.assertEquals(0.41f, job.getProgress(), 0.001f);
+ Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f);
+
+ // Finish off the 2 remaining maps
+ finishNextNTasks(mrApp, it, 2);
+
+ // Wait till all reduce-attempts request for containers
+ for (Task t : job.getTasks().values()) {
+ if (t.getType() == TaskType.REDUCE) {
+ mrApp.waitForState(t.getAttempts().values().iterator().next(),
+ TaskAttemptState.UNASSIGNED);
+ }
+ }
- job.setProgress(100, 80, 0, 0);
allocator.schedule();
- dispatcher.await();
- Assert.assertEquals(41.0f, app.getProgress(), 0.0);
+ rmDispatcher.await();
+ amNodeManager.nodeHeartbeat(true);
+ rmDispatcher.await();
+ allocator.schedule();
+ rmDispatcher.await();
+
+ // Wait for all reduce-tasks to be running
+ for (Task t : job.getTasks().values()) {
+ if (t.getType() == TaskType.REDUCE) {
+ mrApp.waitForState(t, TaskState.RUNNING);
+ }
+ }
+
+ // Finish off 2 reduces
+ finishNextNTasks(mrApp, it, 2);
- job.setProgress(100, 100, 20, 0);
allocator.schedule();
- dispatcher.await();
- Assert.assertEquals(59.0f, app.getProgress(), 0.0);
+ rmDispatcher.await();
+ Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
+ Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
- job.setProgress(100, 100, 100, 100);
+ // Finish off the remaining 8 reduces.
+ finishNextNTasks(mrApp, it, 8);
allocator.schedule();
- dispatcher.await();
- Assert.assertEquals(100.0f, app.getProgress(), 0.0);
+ rmDispatcher.await();
+ // Remaining is JobCleanup
+ Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
+ Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
+ }
+
+ private void finishNextNTasks(MRApp mrApp, Iterator<Task> it, int nextN)
+ throws Exception {
+ Task task;
+ for (int i=0; i<nextN; i++) {
+ task = it.next();
+ finishTask(mrApp, task);
+ }
+ }
+
+ private void finishTask(MRApp mrApp, Task task) throws Exception {
+ TaskAttempt attempt = task.getAttempts().values().iterator().next();
+ mrApp.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE));
+ mrApp.waitForState(task, TaskState.SUCCEEDED);
}
@Test
@@ -438,46 +502,96 @@ public class TestRMContainerAllocator {
LOG.info("Running testReportedAppProgressWithOnlyMaps");
Configuration conf = new Configuration();
- MyResourceManager rm = new MyResourceManager(conf);
+ final MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher();
// Submit the application
- RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ RMApp rmApp = rm.submitApp(1024);
+ rmDispatcher.await();
- MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+ MockNM amNodeManager = rm.registerNode("amNM:1234", 11264);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rmDispatcher.await();
- ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rmDispatcher.await();
- FakeJob job = new FakeJob(appAttemptId, conf, 2, 0);
- MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
- appAttemptId, job);
+ MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(
+ appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) {
+ @Override
+ protected Dispatcher createDispatcher() {
+ return new DrainDispatcher();
+ }
+ protected ContainerAllocator createContainerAllocator(
+ ClientService clientService, AppContext context) {
+ return new MyContainerAllocator(rm, appAttemptId, context);
+ };
+ };
+
+ Assert.assertEquals(0.0, rmApp.getProgress(), 0.0);
+
+ mrApp.submit(conf);
+ Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next()
+ .getValue();
+
+ DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher();
+
+ MyContainerAllocator allocator = (MyContainerAllocator) mrApp
+ .getContainerAllocator();
+
+ mrApp.waitForState(job, JobState.RUNNING);
+
+ amDispatcher.await();
+ // Wait till all map-attempts request for containers
+ for (Task t : job.getTasks().values()) {
+ mrApp.waitForState(t.getAttempts().values().iterator().next(),
+ TaskAttemptState.UNASSIGNED);
+ }
+ amDispatcher.await();
+
+ allocator.schedule();
+ rmDispatcher.await();
+ amNodeManager.nodeHeartbeat(true);
+ rmDispatcher.await();
+ allocator.schedule();
+ rmDispatcher.await();
+
+ // Wait for all map-tasks to be running
+ for (Task t : job.getTasks().values()) {
+ mrApp.waitForState(t, TaskState.RUNNING);
+ }
allocator.schedule(); // Send heartbeat
- dispatcher.await();
- Assert.assertEquals(0.0, app.getProgress(), 0.0);
+ rmDispatcher.await();
+ Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
+ Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f);
+
+ Iterator<Task> it = job.getTasks().values().iterator();
- job.setProgress(100, 10, 0, 0);
+ // Finish off 1 map so that map-progress is 10%
+ finishNextNTasks(mrApp, it, 1);
allocator.schedule();
- dispatcher.await();
- Assert.assertEquals(14f, app.getProgress(), 0.0);
+ rmDispatcher.await();
+ Assert.assertEquals(0.14f, job.getProgress(), 0.001f);
+ Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f);
- job.setProgress(100, 60, 0, 0);
+ // Finish off 5 more map so that map-progress is 60%
+ finishNextNTasks(mrApp, it, 5);
allocator.schedule();
- dispatcher.await();
- Assert.assertEquals(59.0f, app.getProgress(), 0.0);
+ rmDispatcher.await();
+ Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
+ Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
- job.setProgress(100, 100, 0, 100);
+ // Finish off remaining map so that map-progress is 100%
+ finishNextNTasks(mrApp, it, 4);
allocator.schedule();
- dispatcher.await();
- Assert.assertEquals(100.0f, app.getProgress(), 0.0);
+ rmDispatcher.await();
+ Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
+ Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
}
@Test
@@ -1000,7 +1114,6 @@ public class TestRMContainerAllocator {
private MyResourceManager rm;
- @SuppressWarnings("rawtypes")
private static AppContext createAppContext(
ApplicationAttemptId appAttemptId, Job job) {
AppContext context = mock(AppContext.class);
@@ -1028,7 +1141,15 @@ public class TestRMContainerAllocator {
return service;
}
- MyContainerAllocator(MyResourceManager rm, Configuration conf,
+ // Use this constructor when using a real job.
+ MyContainerAllocator(MyResourceManager rm,
+ ApplicationAttemptId appAttemptId, AppContext context) {
+ super(createMockClientService(), context);
+ this.rm = rm;
+ }
+
+ // Use this constructor when you are using a mocked job.
+ public MyContainerAllocator(MyResourceManager rm, Configuration conf,
ApplicationAttemptId appAttemptId, Job job) {
super(createMockClientService(), createAppContext(appAttemptId, job));
this.rm = rm;
@@ -1090,6 +1211,7 @@ public class TestRMContainerAllocator {
return result;
}
+ @Override
protected void startAllocatorThread() {
// override to NOT start thread
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1224995&r1=1224994&r2=1224995&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Tue Dec 27 19:54:51 2011
@@ -394,6 +394,11 @@ public class TestRuntimeEstimators {
}
@Override
+ public float getProgress() {
+ return 0;
+ }
+
+ @Override
public Counters getCounters() {
throw new UnsupportedOperationException("Not supported yet.");
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1224995&r1=1224994&r2=1224995&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Dec 27 19:54:51 2011
@@ -65,7 +65,7 @@ public class TestJobImpl {
Task mockTask = mock(Task.class);
Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
tasks.put(mockTask.getID(), mockTask);
- when(mockJob.getTasks()).thenReturn(tasks);
+ mockJob.tasks = tasks;
when(mockJob.getState()).thenReturn(JobState.ERROR);
JobEvent mockJobEvent = mock(JobEvent.class);
@@ -73,11 +73,12 @@ public class TestJobImpl {
Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition",
JobState.ERROR, state);
}
-
+
@Test
public void testCheckJobCompleteSuccess() {
JobImpl mockJob = mock(JobImpl.class);
+ mockJob.tasks = new HashMap<TaskId, Task>();
OutputCommitter mockCommitter = mock(OutputCommitter.class);
EventHandler mockEventHandler = mock(EventHandler.class);
JobContext mockJobContext = mock(JobContext.class);
@@ -110,7 +111,7 @@ public class TestJobImpl {
Task mockTask = mock(Task.class);
Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
tasks.put(mockTask.getID(), mockTask);
- when(mockJob.getTasks()).thenReturn(tasks);
+ mockJob.tasks = tasks;
try {
// Just in case the code breaks and reaches these calls
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1224995&r1=1224994&r2=1224995&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Tue Dec 27 19:54:51 2011
@@ -136,6 +136,11 @@ public class CompletedJob implements org
}
@Override
+ public float getProgress() {
+ return 1.0f;
+ }
+
+ @Override
public JobState getState() {
return report.getJobState();
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1224995&r1=1224994&r2=1224995&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Tue Dec 27 19:54:51 2011
@@ -90,6 +90,11 @@ public class PartialJob implements org.a
}
@Override
+ public float getProgress() {
+ return 1.0f;
+ }
+
+ @Override
public Counters getCounters() {
return null;
}