You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/03/03 22:55:04 UTC
crunch git commit: CRUNCH-502: Fix interface/context differences in
the CrunchOutputFormat wrapper code
Repository: crunch
Updated Branches:
refs/heads/master 04e02516a -> 8c35a1399
CRUNCH-502: Fix interface/context differences in the CrunchOutputFormat wrapper code
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8c35a139
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8c35a139
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8c35a139
Branch: refs/heads/master
Commit: 8c35a13996eba01671ce7c7c6f369b180bdd408b
Parents: 04e0251
Author: Josh Wills <jw...@apache.org>
Authored: Mon Mar 2 14:35:02 2015 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Mar 3 13:16:08 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/crunch/io/CrunchOutputs.java | 19 +++++++------------
1 file changed, 7 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/8c35a139/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index 57fe139..0d06931 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -18,13 +18,11 @@
package org.apache.crunch.io;
import com.google.common.collect.Sets;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
@@ -46,7 +44,6 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
/**
* An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances
@@ -82,7 +79,7 @@ public class CrunchOutputs<K, V> {
String namedOutput = e.getKey();
Job job = getJob(jc.getJobID(), e.getKey(), jc.getConfiguration());
OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
- fmt.checkOutputSpecs(jc);
+ fmt.checkOutputSpecs(job);
}
}
@@ -93,8 +90,7 @@ public class CrunchOutputs<K, V> {
String namedOutput = e.getKey();
Job job = getJob(tac.getJobID(), e.getKey(), tac.getConfiguration());
OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
- TaskAttemptContext taskContext = TaskAttemptContextFactory.create(
- job.getConfiguration(), tac.getTaskAttemptID());
+ TaskAttemptContext taskContext = getTaskContext(tac, job);
OutputCommitter oc = fmt.getOutputCommitter(taskContext);
committers.put(namedOutput, oc);
}
@@ -204,7 +200,6 @@ public class CrunchOutputs<K, V> {
if (baseContext != null) {
taskContext = getTaskContext(baseContext, job);
-
recordWriter = fmt.getRecordWriter(taskContext);
}
OutputState<K, V> outputState = new OutputState(taskContext, recordWriter);
@@ -371,16 +366,16 @@ public class CrunchOutputs<K, V> {
Set<Path> handledPaths = Sets.newHashSet();
for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
OutputCommitter oc = e.getValue();
+ Job job = getJob(jobContext.getJobID(), e.getKey(), conf);
+ configureJob(e.getKey(), job, outputs.get(e.getKey()));
if (oc instanceof FileOutputCommitter) {
- Path workPath = ((FileOutputCommitter) oc).getWorkPath();
- if (handledPaths.contains(workPath)) {
+ Path outputPath = ((FileOutputCommitter) oc).getWorkPath().getParent();
+ if (handledPaths.contains(outputPath)) {
continue;
} else {
- handledPaths.add(workPath);
+ handledPaths.add(outputPath);
}
}
- Job job = getJob(jobContext.getJobID(), e.getKey(), conf);
- configureJob(e.getKey(), job, outputs.get(e.getKey()));
oc.commitJob(job);
}
}