You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/09/29 20:24:14 UTC

git commit: CRUNCH-271: Cache Counters immediately upon Job completion

Updated Branches:
  refs/heads/master 16d4d35f7 -> af3df548a


CRUNCH-271: Cache Counters immediately upon Job completion


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/af3df548
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/af3df548
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/af3df548

Branch: refs/heads/master
Commit: af3df548a3d8c70cf5b4849f5e73e8a7fbe67f3c
Parents: 16d4d35
Author: Josh Wills <jw...@apache.org>
Authored: Sat Sep 28 18:56:14 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sat Sep 28 19:37:24 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/crunch/PipelineResult.java  | 21 +++++++++++++++++++-
 .../lib/jobcontrol/CrunchControlledJob.java     |  7 +++++++
 .../apache/crunch/impl/mr/exec/MRExecutor.java  |  2 +-
 3 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/af3df548/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
index bd29999..f98f305 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
@@ -18,6 +18,7 @@
 package org.apache.crunch;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.util.Map;
@@ -45,7 +46,7 @@ public class PipelineResult {
       this(stageName, stageName, counters);
     }
 
-    public StageResult(String stageName, String stageId, Counters counters){
+    public StageResult(String stageName, String stageId, Counters counters) {
       this.stageName = stageName;
       this.stageId = stageId;
       this.counters = counters;
@@ -73,6 +74,9 @@ public class PipelineResult {
      * @return a map of group names to counter names.
      */
     public Map<String, Set<String>> getCounterNames() {
+      if (counters == null) {
+        return ImmutableMap.of();
+      }
       Map<String, Set<String>> names = Maps.newHashMap();
       for (CounterGroup counterGroup : counters) {
         Set<String> counterNames = Sets.newHashSet();
@@ -91,22 +95,37 @@ public class PipelineResult {
      */
     @Deprecated
     public Counter findCounter(Enum<?> key) {
+      if (counters == null) {
+        return null;
+      }
       return counters.findCounter(key);
     }
 
     public long getCounterValue(String groupName, String counterName) {
+      if (counters == null) {
+        return 0L;
+      }
       return counters.findCounter(groupName, counterName).getValue();
     }
 
     public String getCounterDisplayName(String groupName, String counterName) {
+      if (counters == null) {
+        return null;
+      }
       return counters.findCounter(groupName, counterName).getDisplayName();
     }
 
     public long getCounterValue(Enum<?> key) {
+      if (counters == null) {
+        return 0L;
+      }
       return counters.findCounter(key).getValue();
     }
 
     public String getCounterDisplayName(Enum<?> key) {
+      if (counters == null) {
+        return null;
+      }
       return counters.findCounter(key).getDisplayName();
     }
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/af3df548/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index 0038ab7..ab46263 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.impl.mr.MRJob;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.util.StringUtils;
@@ -60,6 +61,7 @@ public class CrunchControlledJob implements MRJob {
   // some info for human consumption, e.g. the reason why the job failed
   private String message;
   private String lastKnownProgress;
+  private Counters counters;
 
   /**
    * Construct a job.
@@ -136,6 +138,10 @@ public class CrunchControlledJob implements MRJob {
     return this.job.getJobID();
   }
 
+  public Counters getCounters() {
+    return counters;
+  }
+
   @Override
   public synchronized Job getJob() {
     return this.job;
@@ -225,6 +231,7 @@ public class CrunchControlledJob implements MRJob {
   private void checkRunningState() throws IOException, InterruptedException {
     try {
       if (job.isComplete()) {
+        this.counters = job.getCounters();
         if (job.isSuccessful()) {
           this.state = State.SUCCESS;
         } else {

http://git-wip-us.apache.org/repos/asf/crunch/blob/af3df548/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index e223e5f..532e37c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -112,7 +112,7 @@ public class MRExecutor implements MRPipelineExecution {
       }
       List<PipelineResult.StageResult> stages = Lists.newArrayList();
       for (CrunchControlledJob job : control.getSuccessfulJobList()) {
-        stages.add(new PipelineResult.StageResult(job.getJobName(), job.getMapredJobID().toString(), job.getJob().getCounters()));
+        stages.add(new PipelineResult.StageResult(job.getJobName(), job.getMapredJobID().toString(), job.getCounters()));
       }
 
       for (PCollectionImpl<?> c : outputTargets.keySet()) {