You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/07/19 22:00:47 UTC
tez git commit: TEZ-3348. NullPointerException in Tez MROutput while
trying to write using Parquet's DeprecatedParquetOutputFormat. (Piyush Narang
via hitesh)
Repository: tez
Updated Branches:
refs/heads/master 8bfbdfefa -> 280a98ed4
TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat. (Piyush Narang via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/280a98ed
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/280a98ed
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/280a98ed
Branch: refs/heads/master
Commit: 280a98ed431aeb15890de1cca5397f63f0455905
Parents: 8bfbdfe
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Jul 19 15:00:03 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Jul 19 15:00:03 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/mapreduce/output/MROutput.java | 5 +-
.../tez/mapreduce/output/TestMROutput.java | 99 +++++++++++++++++++-
3 files changed, 103 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/280a98ed/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6e29110..c400512 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat.
TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.
TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans.
TEZ-3337. Do not log empty fields of TaskAttemptFinishedEvent to avoid confusion.
@@ -83,6 +84,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat.
TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.
TEZ-3329. Tez ATS data is incomplete for a vertex which fails or gets killed before initialization
TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans.
http://git-wip-us.apache.org/repos/asf/tez/blob/280a98ed/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index ec83bf5..043085d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -395,6 +395,8 @@ public class MROutput extends AbstractLogicalOutput {
throw new IOException(cnfe);
}
+ initCommitter(jobConf, useNewApi);
+
try {
newRecordWriter =
newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
@@ -409,6 +411,8 @@ public class MROutput extends AbstractLogicalOutput {
oldOutputFormat = jobConf.getOutputFormat();
outputFormatClassName = oldOutputFormat.getClass().getName();
+ initCommitter(jobConf, useNewApi);
+
FileSystem fs = FileSystem.get(jobConf);
String finalName = getOutputName();
@@ -416,7 +420,6 @@ public class MROutput extends AbstractLogicalOutput {
oldOutputFormat.getRecordWriter(
fs, jobConf, finalName, new MRReporter(getContext().getCounters()));
}
- initCommitter(jobConf, useNewApi);
LOG.info(getContext().getDestinationVertexName() + ": "
+ "outputFormat=" + outputFormatClassName
http://git-wip-us.apache.org/repos/asf/tez/blob/280a98ed/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
index 05bcd98..8b52cc9 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
@@ -34,14 +34,17 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DataSinkDescriptor;
@@ -166,6 +169,58 @@ public class TestMROutput {
assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
}
+ // test to try and use the WorkOutputPathOutputFormat - this checks that the getDefaultWorkFile is
+ // set while creating recordWriters
+ @Test(timeout = 5000)
+ public void testNewAPI_WorkOutputPathOutputFormat() throws Exception {
+ String outputPath = "/tmp/output";
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
+ DataSinkDescriptor dataSink = MROutput
+ .createConfigBuilder(conf, NewAPI_WorkOutputPathReadingOutputFormat.class, outputPath)
+ .build();
+
+ OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+ MROutput output = new MROutput(outputContext, 2);
+ output.initialize();
+
+ assertEquals(true, output.isMapperOutput);
+ assertEquals(true, output.useNewApi);
+ assertEquals(NewAPI_WorkOutputPathReadingOutputFormat.class, output.newOutputFormat.getClass());
+ assertNull(output.oldOutputFormat);
+ assertNotNull(output.newApiTaskAttemptContext);
+ assertNull(output.oldApiTaskAttemptContext);
+ assertNotNull(output.newRecordWriter);
+ assertNull(output.oldRecordWriter);
+ assertEquals(FileOutputCommitter.class, output.committer.getClass());
+ }
+
+ // test to try and use the WorkOutputPathOutputFormat - this checks that the workOutput path is
+ // set while creating recordWriters
+ @Test(timeout = 5000)
+ public void testOldAPI_WorkOutputPathOutputFormat() throws Exception {
+ String outputPath = "/tmp/output";
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, false);
+ DataSinkDescriptor dataSink = MROutput
+ .createConfigBuilder(conf, OldAPI_WorkOutputPathReadingOutputFormat.class, outputPath)
+ .build();
+
+ OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+ MROutput output = new MROutput(outputContext, 2);
+ output.initialize();
+
+ assertEquals(false, output.isMapperOutput);
+ assertEquals(false, output.useNewApi);
+ assertEquals(OldAPI_WorkOutputPathReadingOutputFormat.class, output.oldOutputFormat.getClass());
+ assertNull(output.newOutputFormat);
+ assertNotNull(output.oldApiTaskAttemptContext);
+ assertNull(output.newApiTaskAttemptContext);
+ assertNotNull(output.oldRecordWriter);
+ assertNull(output.newRecordWriter);
+ assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
+ }
+
private OutputContext createMockOutputContext(UserPayload payload) {
OutputContext outputContext = mock(OutputContext.class);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
@@ -199,7 +254,7 @@ public class TestMROutput {
new Path(new Path(System.getProperty("test.build.data", "/tmp")),
"TestMapOutput").makeQualified(fs.getUri(), fs.getWorkingDirectory());
- LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
+ return new LogicalIOProcessorRuntimeTask(
taskSpec,
0,
conf,
@@ -209,7 +264,6 @@ public class TestMROutput {
new HashMap<String, String>(),
HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim());
- return task;
}
public static class TestOutputCommitter extends OutputCommitter {
@@ -282,6 +336,47 @@ public class TestMROutput {
}
}
+ // OldAPI OutputFormat class that reads the workoutput path while creating recordWriters
+ public static class OldAPI_WorkOutputPathReadingOutputFormat extends org.apache.hadoop.mapred.FileOutputFormat<String, String> {
+ public static class NoOpRecordWriter implements org.apache.hadoop.mapred.RecordWriter<String, String> {
+ @Override
+ public void write(String key, String value) throws IOException {}
+
+ @Override
+ public void close(Reporter reporter) throws IOException {}
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.RecordWriter<String, String> getRecordWriter(
+ FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
+ // check work output path is not null
+ Path workOutputPath = org.apache.hadoop.mapred.FileOutputFormat.getWorkOutputPath(job);
+ assertNotNull(workOutputPath);
+ return new NoOpRecordWriter();
+ }
+ }
+
+ // NewAPI OutputFormat class that reads the default work file while creating recordWriters
+ public static class NewAPI_WorkOutputPathReadingOutputFormat extends FileOutputFormat<String, String> {
+ public static class NoOpRecordWriter extends RecordWriter<String, String> {
+ @Override
+ public void write(String key, String value) throws IOException, InterruptedException {
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ }
+ }
+
+ @Override
+ public RecordWriter<String, String> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
+ // check default work file is not null
+ Path workOutputPath = getDefaultWorkFile(job, ".foo");
+ assertNotNull(workOutputPath);
+ return new NoOpRecordWriter();
+ }
+ }
+
public static class TestProcessor extends SimpleProcessor {
public TestProcessor(ProcessorContext context) {
super(context);