You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2012/02/08 00:07:57 UTC
svn commit: r1241693 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-ap...
Author: sseth
Date: Tue Feb 7 23:07:56 2012
New Revision: 1241693
URL: http://svn.apache.org/viewvc?rev=1241693&view=rev
Log:
merge <JIRA> from trunk
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1241693&r1=1241692&r2=1241693&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Feb 7 23:07:56 2012
@@ -711,6 +711,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3826. Fixed a bug in RM web-ui which broke sorting. (Jonathan
Eagles via acmurthy)
+ MAPREDUCE-3823. Ensure counters are calculated only once after a job
+ finishes. (Vinod Kumar Vavilapalli via sseth)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1241693&r1=1241692&r2=1241693&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Feb 7 23:07:56 2012
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.Reentr
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -106,7 +107,7 @@ import org.apache.hadoop.yarn.state.Stat
/** Implementation of Job interface. Maintains the state machines of Job.
* The read and write calls use ReadWriteLock for concurrency.
*/
-@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" })
+@SuppressWarnings({ "rawtypes", "unchecked" })
public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
EventHandler<JobEvent> {
@@ -153,6 +154,10 @@ public class JobImpl implements org.apac
private boolean lazyTasksCopyNeeded = false;
volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
private Counters jobCounters = new Counters();
+ private Object fullCountersLock = new Object();
+ private Counters fullCounters = null;
+ private Counters finalMapCounters = null;
+ private Counters finalReduceCounters = null;
// FIXME:
//
// Can then replace task-level uber counters (MR-2424) with job-level ones
@@ -473,11 +478,21 @@ public class JobImpl implements org.apac
@Override
public Counters getAllCounters() {
- Counters counters = new Counters();
+
readLock.lock();
+
try {
+ JobState state = getState();
+ if (state == JobState.ERROR || state == JobState.FAILED
+ || state == JobState.KILLED || state == JobState.SUCCEEDED) {
+ this.mayBeConstructFinalFullCounters();
+ return fullCounters;
+ }
+
+ Counters counters = new Counters();
counters.incrAllCounters(jobCounters);
return incrTaskCounters(counters, tasks.values());
+
} finally {
readLock.unlock();
}
@@ -525,17 +540,21 @@ public class JobImpl implements org.apac
try {
JobState state = getState();
+ // jobFile can be null if the job is not yet inited.
+ String jobFile =
+ remoteJobConfFile == null ? "" : remoteJobConfFile.toString();
+
if (getState() == JobState.NEW) {
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
- cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
+ cleanupProgress, jobFile, amInfos, isUber);
}
computeProgress();
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress,
this.mapProgress, this.reduceProgress,
- cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
+ cleanupProgress, jobFile, amInfos, isUber);
} finally {
readLock.unlock();
}
@@ -1143,26 +1162,49 @@ public class JobImpl implements org.apac
// not be generated for KilledJobs, etc.
private static JobFinishedEvent createJobFinishedEvent(JobImpl job) {
- Counters mapCounters = new Counters();
- Counters reduceCounters = new Counters();
- for (Task t : job.tasks.values()) {
- Counters counters = t.getCounters();
- switch (t.getType()) {
- case MAP: mapCounters.incrAllCounters(counters); break;
- case REDUCE: reduceCounters.incrAllCounters(counters); break;
- }
- }
+ job.mayBeConstructFinalFullCounters();
JobFinishedEvent jfe = new JobFinishedEvent(
job.oldJobId, job.finishTime,
job.succeededMapTaskCount, job.succeededReduceTaskCount,
job.failedMapTaskCount, job.failedReduceTaskCount,
- mapCounters,
- reduceCounters,
- job.getAllCounters());
+ job.finalMapCounters,
+ job.finalReduceCounters,
+ job.fullCounters);
return jfe;
}
+ private void mayBeConstructFinalFullCounters() {
+ // Calculating full-counters. This should happen only once for the job.
+ synchronized (this.fullCountersLock) {
+ if (this.fullCounters != null) {
+ // Already constructed. Just return.
+ return;
+ }
+ this.constructFinalFullcounters();
+ }
+ }
+
+ @Private
+ public void constructFinalFullcounters() {
+ this.fullCounters = new Counters();
+ this.finalMapCounters = new Counters();
+ this.finalReduceCounters = new Counters();
+ this.fullCounters.incrAllCounters(jobCounters);
+ for (Task t : this.tasks.values()) {
+ Counters counters = t.getCounters();
+ switch (t.getType()) {
+ case MAP:
+ this.finalMapCounters.incrAllCounters(counters);
+ break;
+ case REDUCE:
+ this.finalReduceCounters.incrAllCounters(counters);
+ break;
+ }
+ this.fullCounters.incrAllCounters(counters);
+ }
+ }
+
// Task-start has been moved out of InitTransition, so this arc simply
// hardcodes 0 for both map and reduce finished tasks.
private static class KillNewJobTransition
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1241693&r1=1241692&r2=1241693&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Tue Feb 7 23:07:56 2012
@@ -18,6 +18,10 @@
package org.apache.hadoop.mapreduce.v2.app;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.spy;
+
import java.util.Iterator;
import junit.framework.Assert;
@@ -35,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.junit.Test;
/**
@@ -175,6 +180,41 @@ public class TestMRApp {
app.waitForState(job, JobState.ERROR);
}
+ private final class MRAppWithSpiedJob extends MRApp {
+ private JobImpl spiedJob;
+
+ private MRAppWithSpiedJob(int maps, int reduces, boolean autoComplete,
+ String testName, boolean cleanOnStart) {
+ super(maps, reduces, autoComplete, testName, cleanOnStart);
+ }
+
+ @Override
+ protected Job createJob(Configuration conf) {
+ spiedJob = spy((JobImpl) super.createJob(conf));
+ ((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
+ return spiedJob;
+ }
+
+ JobImpl getSpiedJob() {
+ return this.spiedJob;
+ }
+ }
+
+ @Test
+ public void testCountersOnJobFinish() throws Exception {
+ MRAppWithSpiedJob app =
+ new MRAppWithSpiedJob(1, 1, true, this.getClass().getName(), true);
+ JobImpl job = (JobImpl)app.submit(new Configuration());
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+ System.out.println(job.getAllCounters());
+ // Just call getCounters
+ job.getAllCounters();
+ job.getAllCounters();
+ // Should be called only once
+ verify(job, times(1)).constructFinalFullcounters();
+ }
+
@Test
public void checkJobStateTypeConversion() {
//verify that all states can be converted without
@@ -200,5 +240,6 @@ public class TestMRApp {
t.testCommitPending();
t.testCompletedMapsForReduceSlowstart();
t.testJobError();
+ t.testCountersOnJobFinish();
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1241693&r1=1241692&r2=1241693&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Feb 7 23:07:56 2012
@@ -18,48 +18,40 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.IOException;
-import java.util.Map;
import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
-import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
-import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.junit.Test;
import org.junit.Assert;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.any;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
+import org.junit.Test;
/**
* Tests various functions of the JobImpl class
*/
+@SuppressWarnings({"unchecked", "rawtypes"})
public class TestJobImpl {
@Test
@@ -106,7 +98,9 @@ public class TestJobImpl {
"for successful job",
JobImpl.checkJobCompleteSuccess(mockJob));
Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
- JobImpl.checkJobCompleteSuccess(mockJob), JobState.SUCCEEDED);
+ JobImpl.checkJobCompleteSuccess(mockJob), JobState.SUCCEEDED);
+
+
}
@Test
@@ -139,6 +133,7 @@ public class TestJobImpl {
t.testJobNoTasksTransition();
t.testCheckJobCompleteSuccess();
t.testCheckJobCompleteSuccessFailed();
+ t.testCheckAccess();
}
@Test