You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/03/03 22:55:04 UTC

crunch git commit: CRUNCH-502: Fix interface/context differences in the CrunchOutputFormat wrapper code

Repository: crunch
Updated Branches:
  refs/heads/master 04e02516a -> 8c35a1399


CRUNCH-502: Fix interface/context differences in the CrunchOutputFormat wrapper code


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8c35a139
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8c35a139
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8c35a139

Branch: refs/heads/master
Commit: 8c35a13996eba01671ce7c7c6f369b180bdd408b
Parents: 04e0251
Author: Josh Wills <jw...@apache.org>
Authored: Mon Mar 2 14:35:02 2015 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Mar 3 13:16:08 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/crunch/io/CrunchOutputs.java | 19 +++++++------------
 1 file changed, 7 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/8c35a139/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index 57fe139..0d06931 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -18,13 +18,11 @@
 package org.apache.crunch.io;
 
 import com.google.common.collect.Sets;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
@@ -46,7 +44,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 
 /**
  * An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances
@@ -82,7 +79,7 @@ public class CrunchOutputs<K, V> {
       String namedOutput = e.getKey();
       Job job = getJob(jc.getJobID(), e.getKey(), jc.getConfiguration());
       OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
-      fmt.checkOutputSpecs(jc);
+      fmt.checkOutputSpecs(job);
     }
   }
 
@@ -93,8 +90,7 @@ public class CrunchOutputs<K, V> {
       String namedOutput = e.getKey();
       Job job = getJob(tac.getJobID(), e.getKey(), tac.getConfiguration());
       OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
-      TaskAttemptContext taskContext = TaskAttemptContextFactory.create(
-          job.getConfiguration(), tac.getTaskAttemptID());
+      TaskAttemptContext taskContext = getTaskContext(tac, job);
       OutputCommitter oc = fmt.getOutputCommitter(taskContext);
       committers.put(namedOutput, oc);
     }
@@ -204,7 +200,6 @@ public class CrunchOutputs<K, V> {
 
     if (baseContext != null) {
       taskContext = getTaskContext(baseContext, job);
-
       recordWriter = fmt.getRecordWriter(taskContext);
     }
     OutputState<K, V> outputState = new OutputState(taskContext, recordWriter);
@@ -371,16 +366,16 @@ public class CrunchOutputs<K, V> {
       Set<Path> handledPaths = Sets.newHashSet();
       for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
         OutputCommitter oc = e.getValue();
+        Job job = getJob(jobContext.getJobID(), e.getKey(), conf);
+        configureJob(e.getKey(), job, outputs.get(e.getKey()));
         if (oc instanceof FileOutputCommitter) {
-          Path workPath = ((FileOutputCommitter) oc).getWorkPath();
-          if (handledPaths.contains(workPath)) {
+          Path outputPath = ((FileOutputCommitter) oc).getWorkPath().getParent();
+          if (handledPaths.contains(outputPath)) {
             continue;
           } else {
-            handledPaths.add(workPath);
+            handledPaths.add(outputPath);
           }
         }
-        Job job = getJob(jobContext.getJobID(), e.getKey(), conf);
-        configureJob(e.getKey(), job, outputs.get(e.getKey()));
         oc.commitJob(job);
       }
     }