You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/12/05 00:52:15 UTC

crunch git commit: CRUNCH-481: Support independent output committers for multiple outputs

Repository: crunch
Updated Branches:
  refs/heads/master 2c7821fd3 -> d4f23c42c


CRUNCH-481: Support independent output committers for multiple outputs


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d4f23c42
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d4f23c42
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d4f23c42

Branch: refs/heads/master
Commit: d4f23c42c6d215a07ecfea74b1b6cddbc1537eeb
Parents: 2c7821f
Author: Josh Wills <jw...@apache.org>
Authored: Sat Nov 29 16:21:15 2014 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Sat Nov 29 16:21:34 2014 -0800

----------------------------------------------------------------------
 .../crunch/impl/mr/plan/JobPrototype.java       |   2 +
 .../crunch/impl/mr/run/CrunchOutputFormat.java  |  54 ++++
 .../org/apache/crunch/io/CrunchOutputs.java     | 259 +++++++++++++++----
 3 files changed, 260 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/d4f23c42/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index d341184..2863e00 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -34,6 +34,7 @@ import org.apache.crunch.impl.mr.exec.CrunchJobHooks;
 import org.apache.crunch.impl.mr.run.CrunchCombiner;
 import org.apache.crunch.impl.mr.run.CrunchInputFormat;
 import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.impl.mr.run.CrunchOutputFormat;
 import org.apache.crunch.impl.mr.run.CrunchReducer;
 import org.apache.crunch.impl.mr.run.NodeContext;
 import org.apache.crunch.impl.mr.run.RTNode;
@@ -214,6 +215,7 @@ class JobPrototype {
       job.setNumReduceTasks(0);
       inputNodes = Lists.newArrayList(outputNodes);
     }
+    job.setOutputFormatClass(CrunchOutputFormat.class);
     serialize(inputNodes, conf, workingPath, NodeContext.MAP);
 
     if (inputNodes.size() == 1) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/d4f23c42/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
new file mode 100644
index 0000000..bd9cdc9
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import org.apache.crunch.io.CrunchOutputs;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class CrunchOutputFormat<K, V> extends OutputFormat<K, V> {
+  @Override
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext taskAttemptContext)
+      throws IOException, InterruptedException {
+    return new RecordWriter<K, V>() {
+      @Override
+      public void write(K k, V v) throws IOException, InterruptedException {
+      }
+
+      @Override
+      public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+      }
+    };
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
+    CrunchOutputs.checkOutputSpecs(jobContext);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext)
+      throws IOException, InterruptedException {
+    return CrunchOutputs.getOutputCommitter(taskAttemptContext);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/d4f23c42/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index 55fcc89..a536b38 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -17,14 +17,20 @@
  */
 package org.apache.crunch.io;
 
+import com.google.common.collect.Sets;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.base.Joiner;
@@ -35,6 +41,7 @@ import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances
@@ -63,7 +70,32 @@ public class CrunchOutputs<K, V> {
     String existing = conf.get(CRUNCH_OUTPUTS);
     conf.set(CRUNCH_OUTPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
   }
-  
+
+  public static void checkOutputSpecs(JobContext jc) throws IOException, InterruptedException {
+    Map<String, OutputConfig> outputs = getNamedOutputs(jc.getConfiguration());
+    for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) {
+      String namedOutput = e.getKey();
+      Job job = getJob(e.getKey(), jc.getConfiguration());
+      OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
+      fmt.checkOutputSpecs(jc);
+    }
+  }
+
+  public static OutputCommitter getOutputCommitter(TaskAttemptContext tac) throws IOException, InterruptedException {
+    Map<String, OutputConfig> outputs = getNamedOutputs(tac.getConfiguration());
+    Map<String, OutputCommitter> committers = Maps.newHashMap();
+    for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) {
+      String namedOutput = e.getKey();
+      Job job = getJob(e.getKey(), tac.getConfiguration());
+      OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
+      TaskAttemptContext taskContext = TaskAttemptContextFactory.create(
+          job.getConfiguration(), tac.getTaskAttemptID());
+      OutputCommitter oc = fmt.getOutputCommitter(taskContext);
+      committers.put(namedOutput, oc);
+    }
+    return new CompositeOutputCommitter(outputs, committers);
+  }
+
   private static class OutputConfig<K, V> {
     public FormatBundle<OutputFormat<K, V>> bundle;
     public Class<K> keyClass;
@@ -77,11 +109,13 @@ public class CrunchOutputs<K, V> {
     }
   }
   
-  private static Map<String, OutputConfig> getNamedOutputs(
-      TaskInputOutputContext<?, ?, ?, ?> context) {
+  private static Map<String, OutputConfig> getNamedOutputs(Configuration conf) {
     Map<String, OutputConfig> out = Maps.newHashMap();
-    Configuration conf = context.getConfiguration();
-    for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) {
+    String serOut = conf.get(CRUNCH_OUTPUTS);
+    if (serOut == null || serOut.isEmpty()) {
+      return out;
+    }
+    for (String input : Splitter.on(RECORD_SEP).split(serOut)) {
       List<String> fields = Lists.newArrayList(SPLITTER.split(input));
       String name = fields.get(0);
       FormatBundle<OutputFormat> bundle = FormatBundle.fromSerialized(fields.get(1), conf);
@@ -99,10 +133,10 @@ public class CrunchOutputs<K, V> {
   private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
   private static final String COUNTERS_GROUP = CrunchOutputs.class.getName();
 
-  private final TaskInputOutputContext<?, ?, K, V> baseContext;
+  private TaskInputOutputContext<?, ?, K, V> baseContext;
+  private Configuration baseConf;
   private final Map<String, OutputConfig> namedOutputs;
-  private final Map<String, RecordWriter<K, V>> recordWriters;
-  private final Map<String, TaskAttemptContext> taskContextCache;
+  private final Map<String, OutputState<K, V>> outputStates;
   private final boolean disableOutputCounters;
 
   /**
@@ -112,13 +146,17 @@ public class CrunchOutputs<K, V> {
    * @param context the TaskInputOutputContext object
    */
   public CrunchOutputs(TaskInputOutputContext<?, ?, K, V> context) {
+    this(context.getConfiguration());
     this.baseContext = context;
-    namedOutputs = getNamedOutputs(context);
-    recordWriters = Maps.newHashMap();
-    taskContextCache = Maps.newHashMap();
-    this.disableOutputCounters = context.getConfiguration().getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false);
   }
-  
+
+  public CrunchOutputs(Configuration conf) {
+    this.baseConf = conf;
+    this.namedOutputs = getNamedOutputs(conf);
+    this.outputStates = Maps.newHashMap();
+    this.disableOutputCounters = conf.getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false);
+  }
+
   @SuppressWarnings("unchecked")
   public void write(String namedOutput, K key, V value)
       throws IOException, InterruptedException {
@@ -126,63 +164,174 @@ public class CrunchOutputs<K, V> {
       throw new IllegalArgumentException("Undefined named output '" +
         namedOutput + "'");
     }
-    TaskAttemptContext taskContext = getContext(namedOutput);
     if (!disableOutputCounters) {
       baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1);
     }
-    getRecordWriter(taskContext, namedOutput).write(key, value);
+    getOutputState(namedOutput).write(key, value);
   }
   
   public void close() throws IOException, InterruptedException {
-    for (RecordWriter<?, ?> writer : recordWriters.values()) {
-      writer.close(baseContext);
+    for (OutputState<?, ?> out : outputStates.values()) {
+      out.close();
     }
   }
   
-  private TaskAttemptContext getContext(String nameOutput) throws IOException {
-    TaskAttemptContext taskContext = taskContextCache.get(nameOutput);
-    if (taskContext != null) {
-      return taskContext;
+  private OutputState<K, V> getOutputState(String namedOutput) throws IOException, InterruptedException {
+    OutputState<?, ?> out = outputStates.get(namedOutput);
+    if (out != null) {
+      return (OutputState<K, V>) out;
     }
 
     // The following trick leverages the instantiation of a record writer via
     // the job thus supporting arbitrary output formats.
-    OutputConfig outConfig = namedOutputs.get(nameOutput);
-    Configuration conf = new Configuration(baseContext.getConfiguration());
-    Job job = new Job(conf);
-    job.getConfiguration().set("crunch.namedoutput", nameOutput);
-    job.setOutputFormatClass(outConfig.bundle.getFormatClass());
-    job.setOutputKeyClass(outConfig.keyClass);
-    job.setOutputValueClass(outConfig.valueClass);
-    outConfig.bundle.configure(job.getConfiguration());
-    taskContext = TaskAttemptContextFactory.create(
-      job.getConfiguration(), baseContext.getTaskAttemptID());
-
-    taskContextCache.put(nameOutput, taskContext);
-    return taskContext;
+    Job job = getJob(namedOutput, baseConf);
+    OutputFormat<K, V> fmt = getOutputFormat(namedOutput, job, namedOutputs.get(namedOutput));
+    TaskAttemptContext taskContext = null;
+    RecordWriter<K, V> recordWriter = null;
+    if (baseContext != null) {
+      taskContext = TaskAttemptContextFactory.create(
+          job.getConfiguration(), baseContext.getTaskAttemptID());
+      recordWriter = fmt.getRecordWriter(taskContext);
+    }
+    OutputState<K, V> outputState = new OutputState(taskContext, recordWriter);
+    this.outputStates.put(namedOutput, outputState);
+    return outputState;
   }
-  
-  private synchronized RecordWriter<K, V> getRecordWriter(
-      TaskAttemptContext taskContext, String namedOutput) 
-      throws IOException, InterruptedException {
-    // look for record-writer in the cache
-    RecordWriter<K, V> writer = recordWriters.get(namedOutput);
-    
-    // If not in cache, create a new one
-    if (writer == null) {
-      // get the record writer from context output format
-      taskContext.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
-      try {
-        OutputFormat format = ReflectionUtils.newInstance(
-            taskContext.getOutputFormatClass(),
-            taskContext.getConfiguration());
-        writer = format.getRecordWriter(taskContext);
-      } catch (ClassNotFoundException e) {
-        throw new IOException(e);
+
+  private static Job getJob(String namedOutput, Configuration baseConf) throws IOException {
+    Job job = new Job(new Configuration(baseConf));
+    job.getConfiguration().set("crunch.namedoutput", namedOutput);
+    return job;
+  }
+
+  private static void configureJob(
+    String namedOutput,
+    Job job,
+    OutputConfig outConfig) throws IOException {
+      job.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
+      job.setOutputFormatClass(outConfig.bundle.getFormatClass());
+      job.setOutputKeyClass(outConfig.keyClass);
+      job.setOutputValueClass(outConfig.valueClass);
+      outConfig.bundle.configure(job.getConfiguration());
+    }
+
+  private static OutputFormat getOutputFormat(
+      String namedOutput,
+      Job job,
+      OutputConfig outConfig) throws IOException {
+    configureJob(namedOutput, job, outConfig);
+    try {
+      return ReflectionUtils.newInstance(
+          job.getOutputFormatClass(),
+          job.getConfiguration());
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private static class OutputState<K, V> {
+    private final TaskAttemptContext context;
+    private final RecordWriter<K, V> recordWriter;
+
+    public OutputState(TaskAttemptContext context, RecordWriter<K, V> recordWriter) {
+      this.context = context;
+      this.recordWriter = recordWriter;
+    }
+
+    public void write(K key, V value) throws IOException, InterruptedException {
+      recordWriter.write(key, value);
+    }
+
+    public void close() throws IOException, InterruptedException {
+      recordWriter.close(context);
+    }
+  }
+
+  private static class CompositeOutputCommitter extends OutputCommitter {
+
+    private final Map<String, OutputConfig> outputs;
+    private final Map<String, OutputCommitter> committers;
+
+    public CompositeOutputCommitter(Map<String, OutputConfig> outputs, Map<String, OutputCommitter> committers) {
+      this.outputs = outputs;
+      this.committers = committers;
+    }
+
+    private TaskAttemptContext getContext(String namedOutput, TaskAttemptContext baseContext) throws IOException {
+      Job job = getJob(namedOutput, baseContext.getConfiguration());
+      configureJob(namedOutput, job, outputs.get(namedOutput));
+      return TaskAttemptContextFactory.create(job.getConfiguration(), baseContext.getTaskAttemptID());
+    }
+
+    @Override
+    public void setupJob(JobContext jobContext) throws IOException {
+      Configuration conf = jobContext.getConfiguration();
+      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+        Job job = getJob(e.getKey(), conf);
+        configureJob(e.getKey(), job, outputs.get(e.getKey()));
+        e.getValue().setupJob(job);
+      }
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+        e.getValue().setupTask(getContext(e.getKey(), taskAttemptContext));
+      }
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
+      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+        if (e.getValue().needsTaskCommit(getContext(e.getKey(), taskAttemptContext))) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+        e.getValue().commitTask(getContext(e.getKey(), taskAttemptContext));
+      }
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
+      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+        e.getValue().abortTask(getContext(e.getKey(), taskAttemptContext));
+      }
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+      Configuration conf = jobContext.getConfiguration();
+      Set<Path> handledPaths = Sets.newHashSet();
+      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+        OutputCommitter oc = e.getValue();
+        if (oc instanceof FileOutputCommitter) {
+          Path workPath = ((FileOutputCommitter) oc).getWorkPath();
+          if (handledPaths.contains(workPath)) {
+            continue;
+          } else {
+            handledPaths.add(workPath);
+          }
+        }
+        Job job = getJob(e.getKey(), conf);
+        configureJob(e.getKey(), job, outputs.get(e.getKey()));
+        oc.commitJob(job);
+      }
+    }
+
+    @Override
+    public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+      Configuration conf = jobContext.getConfiguration();
+      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+        Job job = getJob(e.getKey(), conf);
+        configureJob(e.getKey(), job, outputs.get(e.getKey()));
+        e.getValue().abortJob(job, state);
       }
-      recordWriters.put(namedOutput, writer);
     }
-    
-    return writer;
   }
 }