You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2012/02/08 00:07:57 UTC

svn commit: r1241693 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-ap...

Author: sseth
Date: Tue Feb  7 23:07:56 2012
New Revision: 1241693

URL: http://svn.apache.org/viewvc?rev=1241693&view=rev
Log:
merge <JIRA> from trunk

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1241693&r1=1241692&r2=1241693&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Feb  7 23:07:56 2012
@@ -711,6 +711,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3826. Fixed a bug in RM web-ui which broke sorting. (Jonathan
     Eagles via acmurthy)
 
+    MAPREDUCE-3823. Ensure counters are calculated only once after a job
+    finishes. (Vinod Kumar Vavilapalli via sseth)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1241693&r1=1241692&r2=1241693&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Feb  7 23:07:56 2012
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.Reentr
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -106,7 +107,7 @@ import org.apache.hadoop.yarn.state.Stat
 /** Implementation of Job interface. Maintains the state machines of Job.
  * The read and write calls use ReadWriteLock for concurrency.
  */
-@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" })
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, 
   EventHandler<JobEvent> {
 
@@ -153,6 +154,10 @@ public class JobImpl implements org.apac
   private boolean lazyTasksCopyNeeded = false;
   volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
   private Counters jobCounters = new Counters();
+  private Object fullCountersLock = new Object();
+  private Counters fullCounters = null;
+  private Counters finalMapCounters = null;
+  private Counters finalReduceCounters = null;
     // FIXME:  
     //
     // Can then replace task-level uber counters (MR-2424) with job-level ones
@@ -473,11 +478,21 @@ public class JobImpl implements org.apac
 
   @Override
   public Counters getAllCounters() {
-    Counters counters = new Counters();
+
     readLock.lock();
+
     try {
+      JobState state = getState();
+      if (state == JobState.ERROR || state == JobState.FAILED
+          || state == JobState.KILLED || state == JobState.SUCCEEDED) {
+        this.mayBeConstructFinalFullCounters();
+        return fullCounters;
+      }
+
+      Counters counters = new Counters();
       counters.incrAllCounters(jobCounters);
       return incrTaskCounters(counters, tasks.values());
+
     } finally {
       readLock.unlock();
     }
@@ -525,17 +540,21 @@ public class JobImpl implements org.apac
     try {
       JobState state = getState();
 
+      // jobFile can be null if the job is not yet inited.
+      String jobFile =
+          remoteJobConfFile == null ? "" : remoteJobConfFile.toString();
+
       if (getState() == JobState.NEW) {
         return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
             appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
-            cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
+            cleanupProgress, jobFile, amInfos, isUber);
       }
 
       computeProgress();
       return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
           appSubmitTime, startTime, finishTime, setupProgress,
           this.mapProgress, this.reduceProgress,
-          cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
+          cleanupProgress, jobFile, amInfos, isUber);
     } finally {
       readLock.unlock();
     }
@@ -1143,26 +1162,49 @@ public class JobImpl implements org.apac
   // not be generated for KilledJobs, etc.
   private static JobFinishedEvent createJobFinishedEvent(JobImpl job) {
 
-    Counters mapCounters = new Counters();
-    Counters reduceCounters = new Counters();
-    for (Task t : job.tasks.values()) {
-      Counters counters = t.getCounters();
-      switch (t.getType()) {
-        case MAP:     mapCounters.incrAllCounters(counters);     break;
-        case REDUCE:  reduceCounters.incrAllCounters(counters);  break;
-      }
-    }
+    job.mayBeConstructFinalFullCounters();
 
     JobFinishedEvent jfe = new JobFinishedEvent(
         job.oldJobId, job.finishTime,
         job.succeededMapTaskCount, job.succeededReduceTaskCount,
         job.failedMapTaskCount, job.failedReduceTaskCount,
-        mapCounters,
-        reduceCounters,
-        job.getAllCounters());
+        job.finalMapCounters,
+        job.finalReduceCounters,
+        job.fullCounters);
     return jfe;
   }
 
+  private void mayBeConstructFinalFullCounters() {
+    // Calculating full-counters. This should happen only once for the job.
+    synchronized (this.fullCountersLock) {
+      if (this.fullCounters != null) {
+        // Already constructed. Just return.
+        return;
+      }
+      this.constructFinalFullcounters();
+    }
+  }
+
+  @Private
+  public void constructFinalFullcounters() {
+    this.fullCounters = new Counters();
+    this.finalMapCounters = new Counters();
+    this.finalReduceCounters = new Counters();
+    this.fullCounters.incrAllCounters(jobCounters);
+    for (Task t : this.tasks.values()) {
+      Counters counters = t.getCounters();
+      switch (t.getType()) {
+      case MAP:
+        this.finalMapCounters.incrAllCounters(counters);
+        break;
+      case REDUCE:
+        this.finalReduceCounters.incrAllCounters(counters);
+        break;
+      }
+      this.fullCounters.incrAllCounters(counters);
+    }
+  }
+
   // Task-start has been moved out of InitTransition, so this arc simply
   // hardcodes 0 for both map and reduce finished tasks.
   private static class KillNewJobTransition

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1241693&r1=1241692&r2=1241693&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Tue Feb  7 23:07:56 2012
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.spy;
+
 import java.util.Iterator;
 
 import junit.framework.Assert;
@@ -35,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.junit.Test;
 
 /**
@@ -175,6 +180,41 @@ public class TestMRApp {
     app.waitForState(job, JobState.ERROR);
   }
 
+  private final class MRAppWithSpiedJob extends MRApp {
+    private JobImpl spiedJob;
+
+    private MRAppWithSpiedJob(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+    }
+
+    @Override
+    protected Job createJob(Configuration conf) {
+      spiedJob = spy((JobImpl) super.createJob(conf));
+      ((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
+      return spiedJob;
+    }
+
+    JobImpl getSpiedJob() {
+      return this.spiedJob;
+    }
+  }
+
+  @Test
+  public void testCountersOnJobFinish() throws Exception {
+    MRAppWithSpiedJob app =
+        new MRAppWithSpiedJob(1, 1, true, this.getClass().getName(), true);
+    JobImpl job = (JobImpl)app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    System.out.println(job.getAllCounters());
+    // Just call getCounters
+    job.getAllCounters();
+    job.getAllCounters();
+    // Should be called only once
+    verify(job, times(1)).constructFinalFullcounters();
+  }
+
   @Test
   public void checkJobStateTypeConversion() {
     //verify that all states can be converted without 
@@ -200,5 +240,6 @@ public class TestMRApp {
     t.testCommitPending();
     t.testCompletedMapsForReduceSlowstart();
     t.testJobError();
+    t.testCountersOnJobFinish();
   }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1241693&r1=1241692&r2=1241693&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Feb  7 23:07:56 2012
@@ -18,48 +18,40 @@
 
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.IOException;
-import java.util.Map;
 import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
-import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
-import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.junit.Test;
 import org.junit.Assert;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.any;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
+import org.junit.Test;
 
 
 /**
  * Tests various functions of the JobImpl class
  */
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class TestJobImpl {
   
   @Test
@@ -106,7 +98,9 @@ public class TestJobImpl {
       "for successful job",
       JobImpl.checkJobCompleteSuccess(mockJob));
     Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
-        JobImpl.checkJobCompleteSuccess(mockJob), JobState.SUCCEEDED);    
+        JobImpl.checkJobCompleteSuccess(mockJob), JobState.SUCCEEDED);
+
+    
   }
 
   @Test
@@ -139,6 +133,7 @@ public class TestJobImpl {
     t.testJobNoTasksTransition();
     t.testCheckJobCompleteSuccess();
     t.testCheckJobCompleteSuccessFailed();
+    t.testCheckAccess();
   }
 
   @Test