You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/12/05 00:52:15 UTC
crunch git commit: CRUNCH-481: Support independent output committers
for multiple outputs
Repository: crunch
Updated Branches:
refs/heads/master 2c7821fd3 -> d4f23c42c
CRUNCH-481: Support independent output committers for multiple outputs
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d4f23c42
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d4f23c42
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d4f23c42
Branch: refs/heads/master
Commit: d4f23c42c6d215a07ecfea74b1b6cddbc1537eeb
Parents: 2c7821f
Author: Josh Wills <jw...@apache.org>
Authored: Sat Nov 29 16:21:15 2014 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Sat Nov 29 16:21:34 2014 -0800
----------------------------------------------------------------------
.../crunch/impl/mr/plan/JobPrototype.java | 2 +
.../crunch/impl/mr/run/CrunchOutputFormat.java | 54 ++++
.../org/apache/crunch/io/CrunchOutputs.java | 259 +++++++++++++++----
3 files changed, 260 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/d4f23c42/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index d341184..2863e00 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -34,6 +34,7 @@ import org.apache.crunch.impl.mr.exec.CrunchJobHooks;
import org.apache.crunch.impl.mr.run.CrunchCombiner;
import org.apache.crunch.impl.mr.run.CrunchInputFormat;
import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.impl.mr.run.CrunchOutputFormat;
import org.apache.crunch.impl.mr.run.CrunchReducer;
import org.apache.crunch.impl.mr.run.NodeContext;
import org.apache.crunch.impl.mr.run.RTNode;
@@ -214,6 +215,7 @@ class JobPrototype {
job.setNumReduceTasks(0);
inputNodes = Lists.newArrayList(outputNodes);
}
+ job.setOutputFormatClass(CrunchOutputFormat.class);
serialize(inputNodes, conf, workingPath, NodeContext.MAP);
if (inputNodes.size() == 1) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/d4f23c42/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
new file mode 100644
index 0000000..bd9cdc9
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import org.apache.crunch.io.CrunchOutputs;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class CrunchOutputFormat<K, V> extends OutputFormat<K, V> {
+ @Override
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ return new RecordWriter<K, V>() {
+ @Override
+ public void write(K k, V v) throws IOException, InterruptedException {
+ }
+
+ @Override
+ public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ }
+ };
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
+ CrunchOutputs.checkOutputSpecs(jobContext);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ return CrunchOutputs.getOutputCommitter(taskAttemptContext);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d4f23c42/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index 55fcc89..a536b38 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -17,14 +17,20 @@
*/
package org.apache.crunch.io;
+import com.google.common.collect.Sets;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.base.Joiner;
@@ -35,6 +41,7 @@ import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances
@@ -63,7 +70,32 @@ public class CrunchOutputs<K, V> {
String existing = conf.get(CRUNCH_OUTPUTS);
conf.set(CRUNCH_OUTPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
}
-
+
+ public static void checkOutputSpecs(JobContext jc) throws IOException, InterruptedException {
+ Map<String, OutputConfig> outputs = getNamedOutputs(jc.getConfiguration());
+ for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) {
+ String namedOutput = e.getKey();
+ Job job = getJob(e.getKey(), jc.getConfiguration());
+ OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
+ fmt.checkOutputSpecs(jc);
+ }
+ }
+
+ public static OutputCommitter getOutputCommitter(TaskAttemptContext tac) throws IOException, InterruptedException {
+ Map<String, OutputConfig> outputs = getNamedOutputs(tac.getConfiguration());
+ Map<String, OutputCommitter> committers = Maps.newHashMap();
+ for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) {
+ String namedOutput = e.getKey();
+ Job job = getJob(e.getKey(), tac.getConfiguration());
+ OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
+ TaskAttemptContext taskContext = TaskAttemptContextFactory.create(
+ job.getConfiguration(), tac.getTaskAttemptID());
+ OutputCommitter oc = fmt.getOutputCommitter(taskContext);
+ committers.put(namedOutput, oc);
+ }
+ return new CompositeOutputCommitter(outputs, committers);
+ }
+
private static class OutputConfig<K, V> {
public FormatBundle<OutputFormat<K, V>> bundle;
public Class<K> keyClass;
@@ -77,11 +109,13 @@ public class CrunchOutputs<K, V> {
}
}
- private static Map<String, OutputConfig> getNamedOutputs(
- TaskInputOutputContext<?, ?, ?, ?> context) {
+ private static Map<String, OutputConfig> getNamedOutputs(Configuration conf) {
Map<String, OutputConfig> out = Maps.newHashMap();
- Configuration conf = context.getConfiguration();
- for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) {
+ String serOut = conf.get(CRUNCH_OUTPUTS);
+ if (serOut == null || serOut.isEmpty()) {
+ return out;
+ }
+ for (String input : Splitter.on(RECORD_SEP).split(serOut)) {
List<String> fields = Lists.newArrayList(SPLITTER.split(input));
String name = fields.get(0);
FormatBundle<OutputFormat> bundle = FormatBundle.fromSerialized(fields.get(1), conf);
@@ -99,10 +133,10 @@ public class CrunchOutputs<K, V> {
private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
private static final String COUNTERS_GROUP = CrunchOutputs.class.getName();
- private final TaskInputOutputContext<?, ?, K, V> baseContext;
+ private TaskInputOutputContext<?, ?, K, V> baseContext;
+ private Configuration baseConf;
private final Map<String, OutputConfig> namedOutputs;
- private final Map<String, RecordWriter<K, V>> recordWriters;
- private final Map<String, TaskAttemptContext> taskContextCache;
+ private final Map<String, OutputState<K, V>> outputStates;
private final boolean disableOutputCounters;
/**
@@ -112,13 +146,17 @@ public class CrunchOutputs<K, V> {
* @param context the TaskInputOutputContext object
*/
public CrunchOutputs(TaskInputOutputContext<?, ?, K, V> context) {
+ this(context.getConfiguration());
this.baseContext = context;
- namedOutputs = getNamedOutputs(context);
- recordWriters = Maps.newHashMap();
- taskContextCache = Maps.newHashMap();
- this.disableOutputCounters = context.getConfiguration().getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false);
}
-
+
+ public CrunchOutputs(Configuration conf) {
+ this.baseConf = conf;
+ this.namedOutputs = getNamedOutputs(conf);
+ this.outputStates = Maps.newHashMap();
+ this.disableOutputCounters = conf.getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false);
+ }
+
@SuppressWarnings("unchecked")
public void write(String namedOutput, K key, V value)
throws IOException, InterruptedException {
@@ -126,63 +164,174 @@ public class CrunchOutputs<K, V> {
throw new IllegalArgumentException("Undefined named output '" +
namedOutput + "'");
}
- TaskAttemptContext taskContext = getContext(namedOutput);
if (!disableOutputCounters) {
baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1);
}
- getRecordWriter(taskContext, namedOutput).write(key, value);
+ getOutputState(namedOutput).write(key, value);
}
public void close() throws IOException, InterruptedException {
- for (RecordWriter<?, ?> writer : recordWriters.values()) {
- writer.close(baseContext);
+ for (OutputState<?, ?> out : outputStates.values()) {
+ out.close();
}
}
- private TaskAttemptContext getContext(String nameOutput) throws IOException {
- TaskAttemptContext taskContext = taskContextCache.get(nameOutput);
- if (taskContext != null) {
- return taskContext;
+ private OutputState<K, V> getOutputState(String namedOutput) throws IOException, InterruptedException {
+ OutputState<?, ?> out = outputStates.get(namedOutput);
+ if (out != null) {
+ return (OutputState<K, V>) out;
}
// The following trick leverages the instantiation of a record writer via
// the job thus supporting arbitrary output formats.
- OutputConfig outConfig = namedOutputs.get(nameOutput);
- Configuration conf = new Configuration(baseContext.getConfiguration());
- Job job = new Job(conf);
- job.getConfiguration().set("crunch.namedoutput", nameOutput);
- job.setOutputFormatClass(outConfig.bundle.getFormatClass());
- job.setOutputKeyClass(outConfig.keyClass);
- job.setOutputValueClass(outConfig.valueClass);
- outConfig.bundle.configure(job.getConfiguration());
- taskContext = TaskAttemptContextFactory.create(
- job.getConfiguration(), baseContext.getTaskAttemptID());
-
- taskContextCache.put(nameOutput, taskContext);
- return taskContext;
+ Job job = getJob(namedOutput, baseConf);
+ OutputFormat<K, V> fmt = getOutputFormat(namedOutput, job, namedOutputs.get(namedOutput));
+ TaskAttemptContext taskContext = null;
+ RecordWriter<K, V> recordWriter = null;
+ if (baseContext != null) {
+ taskContext = TaskAttemptContextFactory.create(
+ job.getConfiguration(), baseContext.getTaskAttemptID());
+ recordWriter = fmt.getRecordWriter(taskContext);
+ }
+ OutputState<K, V> outputState = new OutputState(taskContext, recordWriter);
+ this.outputStates.put(namedOutput, outputState);
+ return outputState;
}
-
- private synchronized RecordWriter<K, V> getRecordWriter(
- TaskAttemptContext taskContext, String namedOutput)
- throws IOException, InterruptedException {
- // look for record-writer in the cache
- RecordWriter<K, V> writer = recordWriters.get(namedOutput);
-
- // If not in cache, create a new one
- if (writer == null) {
- // get the record writer from context output format
- taskContext.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
- try {
- OutputFormat format = ReflectionUtils.newInstance(
- taskContext.getOutputFormatClass(),
- taskContext.getConfiguration());
- writer = format.getRecordWriter(taskContext);
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
+
+ private static Job getJob(String namedOutput, Configuration baseConf) throws IOException {
+ Job job = new Job(new Configuration(baseConf));
+ job.getConfiguration().set("crunch.namedoutput", namedOutput);
+ return job;
+ }
+
+ private static void configureJob(
+ String namedOutput,
+ Job job,
+ OutputConfig outConfig) throws IOException {
+ job.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
+ job.setOutputFormatClass(outConfig.bundle.getFormatClass());
+ job.setOutputKeyClass(outConfig.keyClass);
+ job.setOutputValueClass(outConfig.valueClass);
+ outConfig.bundle.configure(job.getConfiguration());
+ }
+
+ private static OutputFormat getOutputFormat(
+ String namedOutput,
+ Job job,
+ OutputConfig outConfig) throws IOException {
+ configureJob(namedOutput, job, outConfig);
+ try {
+ return ReflectionUtils.newInstance(
+ job.getOutputFormatClass(),
+ job.getConfiguration());
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private static class OutputState<K, V> {
+ private final TaskAttemptContext context;
+ private final RecordWriter<K, V> recordWriter;
+
+ public OutputState(TaskAttemptContext context, RecordWriter<K, V> recordWriter) {
+ this.context = context;
+ this.recordWriter = recordWriter;
+ }
+
+ public void write(K key, V value) throws IOException, InterruptedException {
+ recordWriter.write(key, value);
+ }
+
+ public void close() throws IOException, InterruptedException {
+ recordWriter.close(context);
+ }
+ }
+
+ private static class CompositeOutputCommitter extends OutputCommitter {
+
+ private final Map<String, OutputConfig> outputs;
+ private final Map<String, OutputCommitter> committers;
+
+ public CompositeOutputCommitter(Map<String, OutputConfig> outputs, Map<String, OutputCommitter> committers) {
+ this.outputs = outputs;
+ this.committers = committers;
+ }
+
+ private TaskAttemptContext getContext(String namedOutput, TaskAttemptContext baseContext) throws IOException {
+ Job job = getJob(namedOutput, baseContext.getConfiguration());
+ configureJob(namedOutput, job, outputs.get(namedOutput));
+ return TaskAttemptContextFactory.create(job.getConfiguration(), baseContext.getTaskAttemptID());
+ }
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+ for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+ Job job = getJob(e.getKey(), conf);
+ configureJob(e.getKey(), job, outputs.get(e.getKey()));
+ e.getValue().setupJob(job);
+ }
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+ e.getValue().setupTask(getContext(e.getKey(), taskAttemptContext));
+ }
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
+ for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+ if (e.getValue().needsTaskCommit(getContext(e.getKey(), taskAttemptContext))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+ e.getValue().commitTask(getContext(e.getKey(), taskAttemptContext));
+ }
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+ e.getValue().abortTask(getContext(e.getKey(), taskAttemptContext));
+ }
+ }
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+ Set<Path> handledPaths = Sets.newHashSet();
+ for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+ OutputCommitter oc = e.getValue();
+ if (oc instanceof FileOutputCommitter) {
+ Path workPath = ((FileOutputCommitter) oc).getWorkPath();
+ if (handledPaths.contains(workPath)) {
+ continue;
+ } else {
+ handledPaths.add(workPath);
+ }
+ }
+ Job job = getJob(e.getKey(), conf);
+ configureJob(e.getKey(), job, outputs.get(e.getKey()));
+ oc.commitJob(job);
+ }
+ }
+
+ @Override
+ public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+ for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+ Job job = getJob(e.getKey(), conf);
+ configureJob(e.getKey(), job, outputs.get(e.getKey()));
+ e.getValue().abortJob(job, state);
}
- recordWriters.put(namedOutput, writer);
}
-
- return writer;
}
}