You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/09/29 20:24:14 UTC
git commit: CRUNCH-271: Cache Counters immediately upon Job completion
Updated Branches:
refs/heads/master 16d4d35f7 -> af3df548a
CRUNCH-271: Cache Counters immediately upon Job completion
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/af3df548
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/af3df548
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/af3df548
Branch: refs/heads/master
Commit: af3df548a3d8c70cf5b4849f5e73e8a7fbe67f3c
Parents: 16d4d35
Author: Josh Wills <jw...@apache.org>
Authored: Sat Sep 28 18:56:14 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sat Sep 28 19:37:24 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/crunch/PipelineResult.java | 21 +++++++++++++++++++-
.../lib/jobcontrol/CrunchControlledJob.java | 7 +++++++
.../apache/crunch/impl/mr/exec/MRExecutor.java | 2 +-
3 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/af3df548/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
index bd29999..f98f305 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
@@ -18,6 +18,7 @@
package org.apache.crunch;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Map;
@@ -45,7 +46,7 @@ public class PipelineResult {
this(stageName, stageName, counters);
}
- public StageResult(String stageName, String stageId, Counters counters){
+ public StageResult(String stageName, String stageId, Counters counters) {
this.stageName = stageName;
this.stageId = stageId;
this.counters = counters;
@@ -73,6 +74,9 @@ public class PipelineResult {
* @return a map of group names to counter names.
*/
public Map<String, Set<String>> getCounterNames() {
+ if (counters == null) {
+ return ImmutableMap.of();
+ }
Map<String, Set<String>> names = Maps.newHashMap();
for (CounterGroup counterGroup : counters) {
Set<String> counterNames = Sets.newHashSet();
@@ -91,22 +95,37 @@ public class PipelineResult {
*/
@Deprecated
public Counter findCounter(Enum<?> key) {
+ if (counters == null) {
+ return null;
+ }
return counters.findCounter(key);
}
public long getCounterValue(String groupName, String counterName) {
+ if (counters == null) {
+ return 0L;
+ }
return counters.findCounter(groupName, counterName).getValue();
}
public String getCounterDisplayName(String groupName, String counterName) {
+ if (counters == null) {
+ return null;
+ }
return counters.findCounter(groupName, counterName).getDisplayName();
}
public long getCounterValue(Enum<?> key) {
+ if (counters == null) {
+ return 0L;
+ }
return counters.findCounter(key).getValue();
}
public String getCounterDisplayName(Enum<?> key) {
+ if (counters == null) {
+ return null;
+ }
return counters.findCounter(key).getDisplayName();
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/af3df548/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index 0038ab7..ab46263 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.impl.mr.MRJob;
import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.StringUtils;
@@ -60,6 +61,7 @@ public class CrunchControlledJob implements MRJob {
// some info for human consumption, e.g. the reason why the job failed
private String message;
private String lastKnownProgress;
+ private Counters counters;
/**
* Construct a job.
@@ -136,6 +138,10 @@ public class CrunchControlledJob implements MRJob {
return this.job.getJobID();
}
+ public Counters getCounters() {
+ return counters;
+ }
+
@Override
public synchronized Job getJob() {
return this.job;
@@ -225,6 +231,7 @@ public class CrunchControlledJob implements MRJob {
private void checkRunningState() throws IOException, InterruptedException {
try {
if (job.isComplete()) {
+ this.counters = job.getCounters();
if (job.isSuccessful()) {
this.state = State.SUCCESS;
} else {
http://git-wip-us.apache.org/repos/asf/crunch/blob/af3df548/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index e223e5f..532e37c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -112,7 +112,7 @@ public class MRExecutor implements MRPipelineExecution {
}
List<PipelineResult.StageResult> stages = Lists.newArrayList();
for (CrunchControlledJob job : control.getSuccessfulJobList()) {
- stages.add(new PipelineResult.StageResult(job.getJobName(), job.getMapredJobID().toString(), job.getJob().getCounters()));
+ stages.add(new PipelineResult.StageResult(job.getJobName(), job.getMapredJobID().toString(), job.getCounters()));
}
for (PCollectionImpl<?> c : outputTargets.keySet()) {