You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by mi...@apache.org on 2016/10/30 15:35:13 UTC
tez git commit: TEZ-3215. Support for MultipleOutputs. (mingma)
Repository: tez
Updated Branches:
refs/heads/master a2f8cc3d9 -> a328d469d
TEZ-3215. Support for MultipleOutputs. (mingma)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a328d469
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a328d469
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a328d469
Branch: refs/heads/master
Commit: a328d469d3de53eae087aee62b10140531a87722
Parents: a2f8cc3
Author: Ming Ma <mi...@twitter.com>
Authored: Sun Oct 30 08:34:48 2016 -0700
Committer: Ming Ma <mi...@twitter.com>
Committed: Sun Oct 30 08:34:48 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/mapreduce/hadoop/MRJobConfig.java | 6 +
.../apache/tez/mapreduce/output/MROutput.java | 138 +++++++++----
.../tez/mapreduce/output/MultiMROutput.java | 203 +++++++++++++++++++
.../tez/mapreduce/output/TestMultiMROutput.java | 193 ++++++++++++++++++
.../library/api/KeyValueWriterWithBasePath.java | 49 +++++
6 files changed, 546 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 83e0b59..b4beb80 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3215. Support for MultipleOutputs.
TEZ-3097. Flaky test: TestCommit.testDAGCommitStartedEventFail_OnDAGSuccess.
TEZ-3487. Improvements in travis yml file to get builds to work.
TEZ-3405. Support ability for AM to kill itself if there is no client heartbeating to it.
http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 7db98bc..02c74b2 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -132,6 +132,12 @@ public interface MRJobConfig {
public static final String MAPREDUCE_JOB_USER_CLASSPATH_FIRST = "mapreduce.job.user.classpath.first";
+ public static String LAZY_OUTPUTFORMAT_OUTPUTFORMAT =
+ "mapreduce.output.lazyoutputformat.outputformat";
+
+ public static String FILEOUTPUTFORMAT_BASE_OUTPUT_NAME =
+ "mapreduce.output.basename";
+
public static final String IO_SORT_FACTOR = "mapreduce.task.io.sort.factor";
public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb";
http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 043085d..6ed70c5 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +45,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -73,14 +75,14 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
* {@link MROutput} is an {@link Output} which allows key/values pairs
* to be written by a processor.
*
- * It is compatible with all standard Apache Hadoop MapReduce
+ * It is compatible with all standard Apache Hadoop MapReduce
* OutputFormat implementations.
- *
+ *
* This class is not meant to be extended by external projects.
*/
@Public
public class MROutput extends AbstractLogicalOutput {
-
+
/**
* Helper class to configure {@link MROutput}
*
@@ -94,18 +96,36 @@ public class MROutput extends AbstractLogicalOutput {
String outputClassName = MROutput.class.getName();
String outputPath;
boolean doCommit = true;
-
- private MROutputConfigBuilder(Configuration conf, Class<?> outputFormatParam) {
+
+ private MROutputConfigBuilder(Configuration conf,
+ Class<?> outputFormatParam, boolean useLazyOutputFormat) {
this.conf = conf;
if (outputFormatParam != null) {
outputFormatProvided = true;
- this.outputFormat = outputFormatParam;
- if (org.apache.hadoop.mapred.OutputFormat.class.isAssignableFrom(outputFormatParam)) {
+ if (org.apache.hadoop.mapred.OutputFormat.class.isAssignableFrom(
+ outputFormatParam)) {
useNewApi = false;
- } else if (org.apache.hadoop.mapreduce.OutputFormat.class.isAssignableFrom(outputFormatParam)) {
+ if (!useLazyOutputFormat) {
+ this.outputFormat = outputFormatParam;
+ } else {
+ conf.setClass(MRJobConfig.LAZY_OUTPUTFORMAT_OUTPUTFORMAT,
+ outputFormatParam,
+ org.apache.hadoop.mapred.OutputFormat.class);
+ this.outputFormat =
+ org.apache.hadoop.mapred.lib.LazyOutputFormat.class;
+ }
+ } else if (OutputFormat.class.isAssignableFrom(outputFormatParam)) {
useNewApi = true;
+ if (!useLazyOutputFormat) {
+ this.outputFormat = outputFormatParam;
+ } else {
+ conf.setClass(MRJobConfig.LAZY_OUTPUTFORMAT_OUTPUTFORMAT,
+ outputFormatParam, OutputFormat.class);
+ this.outputFormat = LazyOutputFormat.class;
+ }
} else {
- throw new TezUncheckedException("outputFormat must be assignable from either " +
+ throw new TezUncheckedException(
+ "outputFormat must be assignable from either " +
"org.apache.hadoop.mapred.OutputFormat or " +
"org.apache.hadoop.mapreduce.OutputFormat" +
" Given: " + outputFormatParam.getName());
@@ -145,8 +165,21 @@ public class MROutput extends AbstractLogicalOutput {
}
private MROutputConfigBuilder setOutputPath(String outputPath) {
- if (!(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.isAssignableFrom(outputFormat) ||
- FileOutputFormat.class.isAssignableFrom(outputFormat))) {
+ boolean passNewLazyOutputFormatCheck =
+ (LazyOutputFormat.class.isAssignableFrom(outputFormat)) &&
+ org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.
+ isAssignableFrom(conf.getClass(
+ MRJobConfig.LAZY_OUTPUTFORMAT_OUTPUTFORMAT, null));
+ boolean passOldLazyOutputFormatCheck =
+ (org.apache.hadoop.mapred.lib.LazyOutputFormat.class.
+ isAssignableFrom(outputFormat)) &&
+ FileOutputFormat.class.isAssignableFrom(conf.getClass(
+ MRJobConfig.LAZY_OUTPUTFORMAT_OUTPUTFORMAT, null));
+
+ if (!(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.
+ isAssignableFrom(outputFormat) ||
+ FileOutputFormat.class.isAssignableFrom(outputFormat) ||
+ passNewLazyOutputFormatCheck || passOldLazyOutputFormatCheck)) {
throw new TezUncheckedException("When setting outputPath the outputFormat must " +
"be assignable from either org.apache.hadoop.mapred.FileOutputFormat or " +
"org.apache.hadoop.mapreduce.lib.output.FileOutputFormat. " +
@@ -277,7 +310,12 @@ public class MROutput extends AbstractLogicalOutput {
*/
public static MROutputConfigBuilder createConfigBuilder(Configuration conf,
@Nullable Class<?> outputFormat) {
- return new MROutputConfigBuilder(conf, outputFormat);
+ return createConfigBuilder(conf, outputFormat, false);
+ }
+
+ public static MROutputConfigBuilder createConfigBuilder(Configuration conf,
+ @Nullable Class<?> outputFormat, boolean useLazyOutputFormat) {
+ return new MROutputConfigBuilder(conf, outputFormat, useLazyOutputFormat);
}
/**
@@ -298,9 +336,14 @@ public class MROutput extends AbstractLogicalOutput {
* @return {@link org.apache.tez.mapreduce.output.MROutput.MROutputConfigBuilder}
*/
public static MROutputConfigBuilder createConfigBuilder(Configuration conf,
- @Nullable Class<?> outputFormat,
- @Nullable String outputPath) {
- MROutputConfigBuilder configurer = new MROutputConfigBuilder(conf, outputFormat);
+ @Nullable Class<?> outputFormat, @Nullable String outputPath) {
+ return createConfigBuilder(conf, outputFormat, outputPath, false);
+ }
+
+ public static MROutputConfigBuilder createConfigBuilder(Configuration conf,
+ @Nullable Class<?> outputFormat, @Nullable String outputPath,
+ boolean useLazyOutputFormat) {
+ MROutputConfigBuilder configurer = createConfigBuilder(conf, outputFormat, useLazyOutputFormat);
if (outputPath != null) {
configurer.setOutputPath(outputPath);
}
@@ -312,9 +355,9 @@ public class MROutput extends AbstractLogicalOutput {
private final NumberFormat taskNumberFormat = NumberFormat.getInstance();
private final NumberFormat nonTaskNumberFormat = NumberFormat.getInstance();
- private JobConf jobConf;
+ protected JobConf jobConf;
boolean useNewApi;
- private AtomicBoolean flushed = new AtomicBoolean(false);
+ protected AtomicBoolean flushed = new AtomicBoolean(false);
@SuppressWarnings("rawtypes")
org.apache.hadoop.mapreduce.OutputFormat newOutputFormat;
@@ -326,7 +369,7 @@ public class MROutput extends AbstractLogicalOutput {
@SuppressWarnings("rawtypes")
org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
- private TezCounter outputRecordCounter;
+ protected TezCounter outputRecordCounter;
@VisibleForTesting
TaskAttemptContext newApiTaskAttemptContext;
@@ -344,6 +387,12 @@ public class MROutput extends AbstractLogicalOutput {
@Override
public List<Event> initialize() throws IOException, InterruptedException {
+ List<Event> events = initializeBase();
+ initWriter();
+ return events;
+ }
+
+ protected List<Event> initializeBase() throws IOException, InterruptedException {
getContext().requestInitialMemory(0l, null); //mandatory call
taskNumberFormat.setMinimumIntegerDigits(5);
taskNumberFormat.setGroupingUsed(false);
@@ -373,18 +422,18 @@ public class MROutput extends AbstractLogicalOutput {
taskAttemptId.getTaskID().getId());
jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
- if (useNewApi) {
- // set the output part name to have a unique prefix
- if (jobConf.get("mapreduce.output.basename") == null) {
- jobConf.set("mapreduce.output.basename", getOutputFileNamePrefix());
- }
- }
-
String outputFormatClassName;
- outputRecordCounter = getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
+ outputRecordCounter = getContext().getCounters().findCounter(
+ TaskCounter.OUTPUT_RECORDS);
if (useNewApi) {
+ // set the output part name to have a unique prefix
+ if (jobConf.get(MRJobConfig.FILEOUTPUTFORMAT_BASE_OUTPUT_NAME) == null) {
+ jobConf.set(MRJobConfig.FILEOUTPUTFORMAT_BASE_OUTPUT_NAME,
+ getOutputFileNamePrefix());
+ }
+
newApiTaskAttemptContext = createTaskAttemptContext(taskAttemptId);
try {
newOutputFormat =
@@ -396,13 +445,6 @@ public class MROutput extends AbstractLogicalOutput {
}
initCommitter(jobConf, useNewApi);
-
- try {
- newRecordWriter =
- newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while creating record writer", e);
- }
} else {
oldApiTaskAttemptContext =
new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
@@ -412,13 +454,6 @@ public class MROutput extends AbstractLogicalOutput {
outputFormatClassName = oldOutputFormat.getClass().getName();
initCommitter(jobConf, useNewApi);
-
- FileSystem fs = FileSystem.get(jobConf);
- String finalName = getOutputName();
-
- oldRecordWriter =
- oldOutputFormat.getRecordWriter(
- fs, jobConf, finalName, new MRReporter(getContext().getCounters()));
}
LOG.info(getContext().getDestinationVertexName() + ": "
@@ -427,6 +462,22 @@ public class MROutput extends AbstractLogicalOutput {
return null;
}
+ private void initWriter() throws IOException {
+ if (useNewApi) {
+ try {
+ newRecordWriter =
+ newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while creating record writer", e);
+ }
+ } else {
+ FileSystem fs = FileSystem.get(jobConf);
+ String finalName = getOutputName(getOutputFileNamePrefix());
+ oldRecordWriter = oldOutputFormat.getRecordWriter(
+ fs, jobConf, finalName, new MRReporter(getContext().getCounters()));
+ }
+ }
+
@Override
public void start() {
}
@@ -475,7 +526,7 @@ public class MROutput extends AbstractLogicalOutput {
isMapperOutput, null);
}
- private String getOutputFileNamePrefix() {
+ protected String getOutputFileNamePrefix() {
String prefix = jobConf.get(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX);
if (prefix == null) {
prefix = "part-v" +
@@ -485,10 +536,9 @@ public class MROutput extends AbstractLogicalOutput {
return prefix;
}
- private String getOutputName() {
+ protected String getOutputName(String prefix) {
// give a unique prefix to the output name
- return getOutputFileNamePrefix() +
- "-" + taskNumberFormat.format(getContext().getTaskIndex());
+ return prefix + "-" + taskNumberFormat.format(getContext().getTaskIndex());
}
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MultiMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MultiMROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MultiMROutput.java
new file mode 100644
index 0000000..0bd573b
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MultiMROutput.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.output;
+
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Output;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
+import org.apache.tez.runtime.library.api.KeyValueWriterWithBasePath;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
+
+/**
+ * {@link MultiMROutput} is an {@link Output} which allows key/values pairs
+ * to be written by a processor to different output files.
+ *
+ * It is compatible with all standard Apache Hadoop MapReduce
+ * OutputFormat implementations.
+ *
+ */
+@Public
+public class MultiMROutput extends MROutput {
+
+ Map<String, org.apache.hadoop.mapreduce.RecordWriter<?, ?>>
+ newRecordWriters;
+
+ Map<String, org.apache.hadoop.mapred.RecordWriter<?, ?>>
+ oldRecordWriters;
+
+ public MultiMROutput(OutputContext outputContext, int numPhysicalOutputs) {
+ super(outputContext, numPhysicalOutputs);
+ }
+
+ @Override
+ public List<Event> initialize() throws IOException, InterruptedException {
+ List<Event> events = super.initializeBase();
+ if (useNewApi) {
+ newRecordWriters = new HashMap<>();
+ } else {
+ oldRecordWriters = new HashMap<>();
+ }
+ return events;
+ }
+
+ /**
+ * Create an
+ * {@link org.apache.tez.mapreduce.output.MROutput.MROutputConfigBuilder}
+ *
+ * @param conf Configuration for the {@link MROutput}
+ * @param outputFormat FileInputFormat derived class
+ * @param outputPath Output path
+ * @return {@link org.apache.tez.mapreduce.output.MROutput.MROutputConfigBuilder}
+ */
+ public static MROutputConfigBuilder createConfigBuilder(Configuration conf,
+ Class<?> outputFormat, String outputPath, boolean useLazyOutputFormat) {
+ return MROutput.createConfigBuilder(conf, outputFormat, outputPath, useLazyOutputFormat)
+ .setOutputClassName(MultiMROutput.class.getName());
+ }
+
+ @Override
+ public KeyValueWriterWithBasePath getWriter() throws IOException {
+ return new KeyValueWriterWithBasePath() {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void write(Object key, Object value) throws IOException {
+ throw new UnsupportedOperationException(
+ "Write without basePath isn't supported.");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void write(Object key, Object value, String basePath)
+ throws IOException {
+ if (basePath == null) {
+ throw new UnsupportedOperationException(
+ "Write without basePath isn't supported.");
+ }
+ if (basePath.length() > 0 && basePath.charAt(0) == '/' ) {
+ // The base path can't be absolute path starting with "/".
+ // Otherwise, it will cause the task temporary files being
+ // written outside the output committer's task work path.
+ throw new UnsupportedOperationException(
+ "Write with absolute basePath isn't supported.");
+ }
+ if (useNewApi) {
+ try {
+ getNewRecordWriter(newApiTaskAttemptContext, basePath).write(
+ key, value);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOInterruptedException(
+ "Interrupted while writing next key-value",e);
+ }
+ } else {
+ getOldRecordWriter(basePath).write(key, value);
+ }
+ outputRecordCounter.increment(1);
+ getContext().notifyProgress();
+ }
+ };
+ }
+
+ /**
+ * Call this in the processor before finishing to ensure outputs that
+ * outputs have been flushed. Must be called before commit.
+ * @throws IOException
+ */
+ @Override
+ public void flush() throws IOException {
+ if (flushed.getAndSet(true)) {
+ return;
+ }
+ try {
+ if (useNewApi) {
+ for (RecordWriter writer : newRecordWriters.values()) {
+ writer.close(newApiTaskAttemptContext);
+ }
+ } else {
+ for (org.apache.hadoop.mapred.RecordWriter writer :
+ oldRecordWriters.values()) {
+ writer.close(null);
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while closing record writer", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private synchronized RecordWriter getNewRecordWriter(
+ TaskAttemptContext taskContext, String baseFileName)
+ throws IOException, InterruptedException {
+
+ // look for record-writer in the cache
+ RecordWriter writer = newRecordWriters.get(baseFileName);
+
+ // If not in cache, create a new one
+ if (writer == null) {
+ // get the record writer from context output format
+ taskContext.getConfiguration().set(
+ MRJobConfig.FILEOUTPUTFORMAT_BASE_OUTPUT_NAME, baseFileName);
+ try {
+ writer = ((OutputFormat) ReflectionUtils.newInstance(
+ taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
+ .getRecordWriter(taskContext);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ // add the record-writer to the cache
+ newRecordWriters.put(baseFileName, writer);
+ }
+ return writer;
+ }
+
+ @SuppressWarnings("unchecked")
+ private synchronized org.apache.hadoop.mapred.RecordWriter
+ getOldRecordWriter(String baseFileName) throws IOException {
+
+ // look for record-writer in the cache
+ org.apache.hadoop.mapred.RecordWriter writer =
+ oldRecordWriters.get(baseFileName);
+
+ // If not in cache, create a new one
+ if (writer == null) {
+ FileSystem fs = FileSystem.get(jobConf);
+ String finalName = getOutputName(baseFileName);
+ writer = oldOutputFormat.getRecordWriter(fs, jobConf,
+ finalName, new MRReporter(getContext().getCounters()));
+ // add the record-writer to the cache
+ oldRecordWriters.put(baseFileName, writer);
+ }
+ return writer;
+ }
+};
http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java
new file mode 100644
index 0000000..3618e40
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.output;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.OutputStatisticsReporter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+
+public class TestMultiMROutput {
+
+ @Test(timeout = 5000)
+ public void testNewAPI_TextOutputFormat() throws Exception {
+ validate(true, TextOutputFormat.class, true, FileOutputCommitter.class,
+ false);
+ }
+
+ @Test(timeout = 5000)
+ public void testOldAPI_TextOutputFormat() throws Exception {
+ validate(false, org.apache.hadoop.mapred.TextOutputFormat.class, false,
+ org.apache.hadoop.mapred.FileOutputCommitter.class, false);
+ }
+
+ @Test(timeout = 5000)
+ public void testNewAPI_SequenceFileOutputFormat() throws Exception {
+ validate(true, SequenceFileOutputFormat.class, false,
+ FileOutputCommitter.class, false);
+ }
+
+ @Test(timeout = 5000)
+ public void testOldAPI_SequenceFileOutputFormat() throws Exception {
+ validate(false, org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+ false, org.apache.hadoop.mapred.FileOutputCommitter.class, false);
+ }
+
+ @Test(timeout = 5000)
+ public void testNewAPI_LazySequenceFileOutputFormat() throws Exception {
+ validate(true, SequenceFileOutputFormat.class, false,
+ FileOutputCommitter.class, true);
+ }
+
+ @Test(timeout = 5000)
+ public void testOldAPI_LazySequenceFileOutputFormat() throws Exception {
+ validate(false, org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+ false, org.apache.hadoop.mapred.FileOutputCommitter.class, true);
+ }
+
+ @Test(timeout = 5000)
+ public void testNewAPI_LazyTextOutputFormat() throws Exception {
+ validate(true, TextOutputFormat.class, false,
+ FileOutputCommitter.class, true);
+ }
+
+ @Test(timeout = 5000)
+ public void testOldAPI_LazyTextOutputFormat() throws Exception {
+ validate(false, org.apache.hadoop.mapred.TextOutputFormat.class, false,
+ org.apache.hadoop.mapred.FileOutputCommitter.class, true);
+ }
+
+ @Test(timeout = 5000)
+ public void testInvalidBasePath() throws Exception {
+ MultiMROutput outputs = createMROutputs(SequenceFileOutputFormat.class,
+ false, true);
+ try {
+ outputs.getWriter().write(new Text(Integer.toString(0)),
+ new Text("foo"), "/tmp");
+ Assert.assertTrue(false); // should not come here
+ } catch (UnsupportedOperationException uoe) {
+ }
+ }
+
+ private OutputContext createMockOutputContext(UserPayload payload) {
+ OutputContext outputContext = mock(OutputContext.class);
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ when(outputContext.getUserPayload()).thenReturn(payload);
+ when(outputContext.getApplicationId()).thenReturn(appId);
+ when(outputContext.getTaskVertexIndex()).thenReturn(1);
+ when(outputContext.getTaskAttemptNumber()).thenReturn(1);
+ when(outputContext.getCounters()).thenReturn(new TezCounters());
+ when(outputContext.getStatisticsReporter()).thenReturn(
+ mock(OutputStatisticsReporter.class));
+ return outputContext;
+ }
+
+ private void validate(boolean expectedUseNewAPIValue, Class outputFormat,
+ boolean isMapper, Class committerClass, boolean useLazyOutputFormat)
+ throws InterruptedException, IOException {
+ MultiMROutput output = createMROutputs(outputFormat, isMapper,
+ useLazyOutputFormat);
+
+ assertEquals(isMapper, output.isMapperOutput);
+ assertEquals(expectedUseNewAPIValue, output.useNewApi);
+ if (expectedUseNewAPIValue) {
+ if (useLazyOutputFormat) {
+ assertEquals(LazyOutputFormat.class,
+ output.newOutputFormat.getClass());
+ } else {
+ assertEquals(outputFormat, output.newOutputFormat.getClass());
+ }
+ assertNotNull(output.newApiTaskAttemptContext);
+ assertNull(output.oldOutputFormat);
+ assertEquals(Text.class,
+ output.newApiTaskAttemptContext.getOutputValueClass());
+ assertEquals(Text.class,
+ output.newApiTaskAttemptContext.getOutputKeyClass());
+ assertNull(output.oldApiTaskAttemptContext);
+ assertNotNull(output.newRecordWriters);
+ assertNull(output.oldRecordWriters);
+ } else {
+ if (!useLazyOutputFormat) {
+ assertEquals(outputFormat, output.oldOutputFormat.getClass());
+ } else {
+ assertEquals(org.apache.hadoop.mapred.lib.LazyOutputFormat.class,
+ output.oldOutputFormat.getClass());
+ }
+ assertNull(output.newOutputFormat);
+ assertNotNull(output.oldApiTaskAttemptContext);
+ assertNull(output.newApiTaskAttemptContext);
+ assertEquals(Text.class,
+ output.oldApiTaskAttemptContext.getOutputValueClass());
+ assertEquals(Text.class,
+ output.oldApiTaskAttemptContext.getOutputKeyClass());
+ assertNotNull(output.oldRecordWriters);
+ assertNull(output.newRecordWriters);
+ }
+
+ assertEquals(committerClass, output.committer.getClass());
+ int numOfUniqueKeys = 3;
+ for (int i=0; i<numOfUniqueKeys; i++) {
+ output.getWriter().write(new Text(Integer.toString(i)),
+ new Text("foo"), Integer.toString(i));
+ }
+ output.close();
+ if (expectedUseNewAPIValue) {
+ assertEquals(numOfUniqueKeys, output.newRecordWriters.size());
+ } else {
+ assertEquals(numOfUniqueKeys, output.oldRecordWriters.size());
+ }
+ }
+
+ private MultiMROutput createMROutputs(Class outputFormat,
+ boolean isMapper, boolean useLazyOutputFormat)
+ throws InterruptedException, IOException {
+ String outputPath = "/tmp/output";
+ JobConf conf = new JobConf();
+ conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, isMapper);
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(Text.class);
+ DataSinkDescriptor dataSink = MultiMROutput.createConfigBuilder(
+ conf, outputFormat, outputPath, useLazyOutputFormat).build();
+
+ OutputContext outputContext = createMockOutputContext(
+ dataSink.getOutputDescriptor().getUserPayload());
+ MultiMROutput output = new MultiMROutput(outputContext, 2);
+ output.initialize();
+ return output;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriterWithBasePath.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriterWithBasePath.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriterWithBasePath.java
new file mode 100644
index 0000000..5446ca6
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriterWithBasePath.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.tez.runtime.api.Writer;
+
+/**
+ * A key/value(s) pair based {@link Writer} that supports
+ * output to different files.
+ */
+@Public
+@Evolving
+public abstract class KeyValueWriterWithBasePath extends KeyValueWriter {
+ /**
+ * Writes a key/value pair.
+ *
+ * @param key
+ * the key to write
+ * @param value
+ * the value to write
+ * @param basePath
+ * the base path of the output file.
+ * @throws IOException
+ * if an error occurs
+ * @throws {@link IOInterruptedException} if IO was interrupted
+ */
+ public abstract void write(Object key, Object value, String basePath)
+ throws IOException;
+}