You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by to...@apache.org on 2015/02/13 15:50:46 UTC
crunch git commit: CRUNCH-481. Support independent output committers
for multiple outputs.
Repository: crunch
Updated Branches:
refs/heads/master dbd56e638 -> fcf901cbb
CRUNCH-481. Support independent output committers for multiple outputs.
Re-added after this was inadvertantly dropped, and updated to fix null
job ID.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fcf901cb
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fcf901cb
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fcf901cb
Branch: refs/heads/master
Commit: fcf901cbb437faf65cae6d1fb4c3513084fe4186
Parents: dbd56e6
Author: Tom White <to...@apache.org>
Authored: Fri Feb 13 14:48:38 2015 +0000
Committer: Tom White <to...@apache.org>
Committed: Fri Feb 13 14:48:38 2015 +0000
----------------------------------------------------------------------
.../crunch/impl/mr/plan/JobPrototype.java | 2 +
.../crunch/impl/mr/run/CrunchOutputFormat.java | 54 ++++
.../org/apache/crunch/io/CrunchOutputs.java | 292 ++++++++++++++++---
3 files changed, 306 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/fcf901cb/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index d341184..2863e00 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -34,6 +34,7 @@ import org.apache.crunch.impl.mr.exec.CrunchJobHooks;
import org.apache.crunch.impl.mr.run.CrunchCombiner;
import org.apache.crunch.impl.mr.run.CrunchInputFormat;
import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.impl.mr.run.CrunchOutputFormat;
import org.apache.crunch.impl.mr.run.CrunchReducer;
import org.apache.crunch.impl.mr.run.NodeContext;
import org.apache.crunch.impl.mr.run.RTNode;
@@ -214,6 +215,7 @@ class JobPrototype {
job.setNumReduceTasks(0);
inputNodes = Lists.newArrayList(outputNodes);
}
+ job.setOutputFormatClass(CrunchOutputFormat.class);
serialize(inputNodes, conf, workingPath, NodeContext.MAP);
if (inputNodes.size() == 1) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/fcf901cb/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
new file mode 100644
index 0000000..bd9cdc9
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import org.apache.crunch.io.CrunchOutputs;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class CrunchOutputFormat<K, V> extends OutputFormat<K, V> {
+ @Override
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ return new RecordWriter<K, V>() {
+ @Override
+ public void write(K k, V v) throws IOException, InterruptedException {
+ }
+
+ @Override
+ public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ }
+ };
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
+ CrunchOutputs.checkOutputSpecs(jobContext);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ return CrunchOutputs.getOutputCommitter(taskAttemptContext);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/fcf901cb/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index e811bcf..57fe139 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -17,14 +17,24 @@
*/
package org.apache.crunch.io;
+import com.google.common.collect.Sets;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.base.Joiner;
@@ -35,6 +45,8 @@ import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
/**
* An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances
@@ -64,6 +76,31 @@ public class CrunchOutputs<K, V> {
conf.set(CRUNCH_OUTPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
}
+ public static void checkOutputSpecs(JobContext jc) throws IOException, InterruptedException {
+ Map<String, OutputConfig> outputs = getNamedOutputs(jc.getConfiguration());
+ for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) {
+ String namedOutput = e.getKey();
+ Job job = getJob(jc.getJobID(), e.getKey(), jc.getConfiguration());
+ OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
+ fmt.checkOutputSpecs(jc);
+ }
+ }
+
+ public static OutputCommitter getOutputCommitter(TaskAttemptContext tac) throws IOException, InterruptedException {
+ Map<String, OutputConfig> outputs = getNamedOutputs(tac.getConfiguration());
+ Map<String, OutputCommitter> committers = Maps.newHashMap();
+ for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) {
+ String namedOutput = e.getKey();
+ Job job = getJob(tac.getJobID(), e.getKey(), tac.getConfiguration());
+ OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
+ TaskAttemptContext taskContext = TaskAttemptContextFactory.create(
+ job.getConfiguration(), tac.getTaskAttemptID());
+ OutputCommitter oc = fmt.getOutputCommitter(taskContext);
+ committers.put(namedOutput, oc);
+ }
+ return new CompositeOutputCommitter(outputs, committers);
+ }
+
public static class OutputConfig<K, V> {
public FormatBundle<OutputFormat<K, V>> bundle;
public Class<K> keyClass;
@@ -84,6 +121,10 @@ public class CrunchOutputs<K, V> {
public static Map<String, OutputConfig> getNamedOutputs(Configuration conf) {
Map<String, OutputConfig> out = Maps.newHashMap();
+ String serOut = conf.get(CRUNCH_OUTPUTS);
+ if (serOut == null || serOut.isEmpty()) {
+ return out;
+ }
for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) {
List<String> fields = Lists.newArrayList(SPLITTER.split(input));
String name = fields.get(0);
@@ -101,10 +142,10 @@ public class CrunchOutputs<K, V> {
private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
private static final String COUNTERS_GROUP = CrunchOutputs.class.getName();
- private final TaskInputOutputContext<?, ?, K, V> baseContext;
+ private TaskInputOutputContext<?, ?, K, V> baseContext;
+ private Configuration baseConf;
private final Map<String, OutputConfig> namedOutputs;
- private final Map<String, RecordWriter<K, V>> recordWriters;
- private final Map<String, TaskAttemptContext> taskContextCache;
+ private final Map<String, OutputState<K, V>> outputStates;
private final boolean disableOutputCounters;
/**
@@ -114,11 +155,15 @@ public class CrunchOutputs<K, V> {
* @param context the TaskInputOutputContext object
*/
public CrunchOutputs(TaskInputOutputContext<?, ?, K, V> context) {
+ this(context.getConfiguration());
this.baseContext = context;
- namedOutputs = getNamedOutputs(context);
- recordWriters = Maps.newHashMap();
- taskContextCache = Maps.newHashMap();
- this.disableOutputCounters = context.getConfiguration().getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false);
+ }
+
+ public CrunchOutputs(Configuration conf) {
+ this.baseConf = conf;
+ this.namedOutputs = getNamedOutputs(conf);
+ this.outputStates = Maps.newHashMap();
+ this.disableOutputCounters = conf.getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false);
}
@SuppressWarnings("unchecked")
@@ -128,63 +173,226 @@ public class CrunchOutputs<K, V> {
throw new IllegalArgumentException("Undefined named output '" +
namedOutput + "'");
}
- TaskAttemptContext taskContext = getContext(namedOutput);
if (!disableOutputCounters) {
baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1);
}
- getRecordWriter(taskContext, namedOutput).write(key, value);
+ getOutputState(namedOutput).write(key, value);
}
public void close() throws IOException, InterruptedException {
- for (RecordWriter<?, ?> writer : recordWriters.values()) {
- writer.close(baseContext);
+ for (OutputState<?, ?> out : outputStates.values()) {
+ out.close();
}
}
-
- private TaskAttemptContext getContext(String nameOutput) throws IOException {
- TaskAttemptContext taskContext = taskContextCache.get(nameOutput);
- if (taskContext != null) {
- return taskContext;
+
+ private OutputState<K, V> getOutputState(String namedOutput) throws IOException, InterruptedException {
+ OutputState<?, ?> out = outputStates.get(namedOutput);
+ if (out != null) {
+ return (OutputState<K, V>) out;
}
// The following trick leverages the instantiation of a record writer via
// the job thus supporting arbitrary output formats.
- OutputConfig outConfig = namedOutputs.get(nameOutput);
- Configuration conf = new Configuration(baseContext.getConfiguration());
- Job job = new Job(conf);
- job.getConfiguration().set("crunch.namedoutput", nameOutput);
+ Job job = getJob(baseContext.getJobID(), namedOutput, baseConf);
+
+ // Get a job with the expected named output.
+ job = getJob(job.getJobID(), namedOutput,baseConf);
+
+ OutputFormat<K, V> fmt = getOutputFormat(namedOutput, job, namedOutputs.get(namedOutput));
+ TaskAttemptContext taskContext = null;
+ RecordWriter<K, V> recordWriter = null;
+
+ if (baseContext != null) {
+ taskContext = getTaskContext(baseContext, job);
+
+ recordWriter = fmt.getRecordWriter(taskContext);
+ }
+ OutputState<K, V> outputState = new OutputState(taskContext, recordWriter);
+ this.outputStates.put(namedOutput, outputState);
+ return outputState;
+ }
+
+ private static Job getJob(JobID jobID, String namedOutput, Configuration baseConf)
+ throws IOException {
+ Job job = new Job(new Configuration(baseConf));
+ job.getConfiguration().set("crunch.namedoutput", namedOutput);
+ setJobID(job, jobID, namedOutput);
+ return job;
+ }
+
+ private static TaskAttemptContext getTaskContext(TaskAttemptContext baseContext, Job job) {
+
+ org.apache.hadoop.mapreduce.TaskAttemptID baseTaskId = baseContext.getTaskAttemptID();
+
+ // Create a task ID context with our specialized job ID.
+ org.apache.hadoop.mapreduce.TaskAttemptID taskId;
+ taskId = new org.apache.hadoop.mapreduce.TaskAttemptID(job.getJobID().getJtIdentifier(),
+ job.getJobID().getId(),
+ baseTaskId.isMap(),
+ baseTaskId.getTaskID().getId(),
+ baseTaskId.getId());
+
+ return TaskAttemptContextFactory.create(
+ job.getConfiguration(), taskId);
+ }
+
+ private static void setJobID(Job job, JobID jobID, String namedOutput) {
+ Method setJobIDMethod;
+ JobID newJobID = jobID;
+ try {
+ // Hadoop 2
+ setJobIDMethod = Job.class.getMethod("setJobID", JobID.class);
+ // Add the named output to the job ID, since that is used by some output formats
+ // to create temporary outputs.
+ newJobID = jobID == null || jobID.getJtIdentifier().contains(namedOutput) ?
+ jobID :
+ new JobID(jobID.getJtIdentifier() + "_" + namedOutput, jobID.getId());
+ } catch (NoSuchMethodException e) {
+ // Hadoop 1's setJobID method is package private and declared by JobContext
+ try {
+ setJobIDMethod = JobContext.class.getDeclaredMethod("setJobID", JobID.class);
+ } catch (NoSuchMethodException e1) {
+ throw new CrunchRuntimeException(e);
+ }
+ setJobIDMethod.setAccessible(true);
+ }
+ try {
+ setJobIDMethod.invoke(job, newJobID);
+ } catch (Exception e) {
+ throw new CrunchRuntimeException("Could not set job ID to " + jobID, e);
+ }
+ }
+
+ private static void configureJob(
+ String namedOutput,
+ Job job,
+ OutputConfig outConfig) throws IOException {
+ job.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
job.setOutputFormatClass(outConfig.bundle.getFormatClass());
job.setOutputKeyClass(outConfig.keyClass);
job.setOutputValueClass(outConfig.valueClass);
outConfig.bundle.configure(job.getConfiguration());
- taskContext = TaskAttemptContextFactory.create(
- job.getConfiguration(), baseContext.getTaskAttemptID());
+ }
- taskContextCache.put(nameOutput, taskContext);
- return taskContext;
+ private static OutputFormat getOutputFormat(
+ String namedOutput,
+ Job job,
+ OutputConfig outConfig) throws IOException {
+ configureJob(namedOutput, job, outConfig);
+ try {
+ return ReflectionUtils.newInstance(
+ job.getOutputFormatClass(),
+ job.getConfiguration());
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
}
- private synchronized RecordWriter<K, V> getRecordWriter(
- TaskAttemptContext taskContext, String namedOutput)
- throws IOException, InterruptedException {
- // look for record-writer in the cache
- RecordWriter<K, V> writer = recordWriters.get(namedOutput);
+ private static class OutputState<K, V> {
+ private final TaskAttemptContext context;
+ private final RecordWriter<K, V> recordWriter;
- // If not in cache, create a new one
- if (writer == null) {
- // get the record writer from context output format
- taskContext.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
- try {
- OutputFormat format = ReflectionUtils.newInstance(
- taskContext.getOutputFormatClass(),
- taskContext.getConfiguration());
- writer = format.getRecordWriter(taskContext);
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
+ public OutputState(TaskAttemptContext context, RecordWriter<K, V> recordWriter) {
+ this.context = context;
+ this.recordWriter = recordWriter;
+ }
+
+ public void write(K key, V value) throws IOException, InterruptedException {
+ recordWriter.write(key, value);
+ }
+
+ public void close() throws IOException, InterruptedException {
+ recordWriter.close(context);
+ }
+ }
+
+ private static class CompositeOutputCommitter extends OutputCommitter {
+
+ private final Map<String, OutputConfig> outputs;
+ private final Map<String, OutputCommitter> committers;
+
+ public CompositeOutputCommitter(Map<String, OutputConfig> outputs, Map<String, OutputCommitter> committers) {
+ this.outputs = outputs;
+ this.committers = committers;
+ }
+
+ private TaskAttemptContext getContext(String namedOutput, TaskAttemptContext baseContext) throws IOException {
+ Job job = getJob(baseContext.getJobID(), namedOutput, baseContext.getConfiguration());
+ configureJob(namedOutput, job, outputs.get(namedOutput));
+
+ return getTaskContext(baseContext, job);
+ }
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+ for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+ Job job = getJob(jobContext.getJobID(), e.getKey(), conf);
+ configureJob(e.getKey(), job, outputs.get(e.getKey()));
+ e.getValue().setupJob(job);
+ }
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+ e.getValue().setupTask(getContext(e.getKey(), taskAttemptContext));
+ }
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
+ for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+ if (e.getValue().needsTaskCommit(getContext(e.getKey(), taskAttemptContext))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+
+ e.getValue().commitTask(getContext(e.getKey(), taskAttemptContext));
}
- recordWriters.put(namedOutput, writer);
}
- return writer;
+ @Override
+ public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+ e.getValue().abortTask(getContext(e.getKey(), taskAttemptContext));
+ }
+ }
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+ Set<Path> handledPaths = Sets.newHashSet();
+ for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+ OutputCommitter oc = e.getValue();
+ if (oc instanceof FileOutputCommitter) {
+ Path workPath = ((FileOutputCommitter) oc).getWorkPath();
+ if (handledPaths.contains(workPath)) {
+ continue;
+ } else {
+ handledPaths.add(workPath);
+ }
+ }
+ Job job = getJob(jobContext.getJobID(), e.getKey(), conf);
+ configureJob(e.getKey(), job, outputs.get(e.getKey()));
+ oc.commitJob(job);
+ }
+ }
+
+ @Override
+ public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+ for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
+ Job job = getJob(jobContext.getJobID(), e.getKey(), conf);
+ configureJob(e.getKey(), job, outputs.get(e.getKey()));
+ e.getValue().abortJob(job, state);
+ }
+ }
}
}