You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/07/19 22:00:47 UTC

tez git commit: TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat. (Piyush Narang via hitesh)

Repository: tez
Updated Branches:
  refs/heads/master 8bfbdfefa -> 280a98ed4


TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat. (Piyush Narang via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/280a98ed
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/280a98ed
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/280a98ed

Branch: refs/heads/master
Commit: 280a98ed431aeb15890de1cca5397f63f0455905
Parents: 8bfbdfe
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Jul 19 15:00:03 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Jul 19 15:00:03 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/tez/mapreduce/output/MROutput.java   |  5 +-
 .../tez/mapreduce/output/TestMROutput.java      | 99 +++++++++++++++++++-
 3 files changed, 103 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/280a98ed/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6e29110..c400512 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat.
   TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.
   TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans.
   TEZ-3337. Do not log empty fields of TaskAttemptFinishedEvent to avoid confusion.
@@ -83,6 +84,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat.
   TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.
   TEZ-3329. Tez ATS data is incomplete for a vertex which fails or gets killed before initialization
   TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans.

http://git-wip-us.apache.org/repos/asf/tez/blob/280a98ed/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index ec83bf5..043085d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -395,6 +395,8 @@ public class MROutput extends AbstractLogicalOutput {
         throw new IOException(cnfe);
       }
 
+      initCommitter(jobConf, useNewApi);
+
       try {
         newRecordWriter =
             newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
@@ -409,6 +411,8 @@ public class MROutput extends AbstractLogicalOutput {
       oldOutputFormat = jobConf.getOutputFormat();
       outputFormatClassName = oldOutputFormat.getClass().getName();
 
+      initCommitter(jobConf, useNewApi);
+
       FileSystem fs = FileSystem.get(jobConf);
       String finalName = getOutputName();
 
@@ -416,7 +420,6 @@ public class MROutput extends AbstractLogicalOutput {
           oldOutputFormat.getRecordWriter(
               fs, jobConf, finalName, new MRReporter(getContext().getCounters()));
     }
-    initCommitter(jobConf, useNewApi);
 
     LOG.info(getContext().getDestinationVertexName() + ": "
         + "outputFormat=" + outputFormatClassName

http://git-wip-us.apache.org/repos/asf/tez/blob/280a98ed/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
index 05bcd98..8b52cc9 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
@@ -34,14 +34,17 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DataSinkDescriptor;
@@ -166,6 +169,58 @@ public class TestMROutput {
     assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
   }
 
+  // test to try and use the WorkOutputPathOutputFormat - this checks that the getDefaultWorkFile is
+  // set while creating recordWriters
+  @Test(timeout = 5000)
+  public void testNewAPI_WorkOutputPathOutputFormat() throws Exception {
+    String outputPath = "/tmp/output";
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
+    DataSinkDescriptor dataSink = MROutput
+      .createConfigBuilder(conf, NewAPI_WorkOutputPathReadingOutputFormat.class, outputPath)
+      .build();
+
+    OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+    MROutput output = new MROutput(outputContext, 2);
+    output.initialize();
+
+    assertEquals(true, output.isMapperOutput);
+    assertEquals(true, output.useNewApi);
+    assertEquals(NewAPI_WorkOutputPathReadingOutputFormat.class, output.newOutputFormat.getClass());
+    assertNull(output.oldOutputFormat);
+    assertNotNull(output.newApiTaskAttemptContext);
+    assertNull(output.oldApiTaskAttemptContext);
+    assertNotNull(output.newRecordWriter);
+    assertNull(output.oldRecordWriter);
+    assertEquals(FileOutputCommitter.class, output.committer.getClass());
+  }
+
+  // test to try and use the WorkOutputPathOutputFormat - this checks that the workOutput path is
+  // set while creating recordWriters
+  @Test(timeout = 5000)
+  public void testOldAPI_WorkOutputPathOutputFormat() throws Exception {
+    String outputPath = "/tmp/output";
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, false);
+    DataSinkDescriptor dataSink = MROutput
+      .createConfigBuilder(conf, OldAPI_WorkOutputPathReadingOutputFormat.class, outputPath)
+      .build();
+
+    OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+    MROutput output = new MROutput(outputContext, 2);
+    output.initialize();
+
+    assertEquals(false, output.isMapperOutput);
+    assertEquals(false, output.useNewApi);
+    assertEquals(OldAPI_WorkOutputPathReadingOutputFormat.class, output.oldOutputFormat.getClass());
+    assertNull(output.newOutputFormat);
+    assertNotNull(output.oldApiTaskAttemptContext);
+    assertNull(output.newApiTaskAttemptContext);
+    assertNotNull(output.oldRecordWriter);
+    assertNull(output.newRecordWriter);
+    assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
+  }
+
   private OutputContext createMockOutputContext(UserPayload payload) {
     OutputContext outputContext = mock(OutputContext.class);
     ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
@@ -199,7 +254,7 @@ public class TestMROutput {
         new Path(new Path(System.getProperty("test.build.data", "/tmp")),
                  "TestMapOutput").makeQualified(fs.getUri(), fs.getWorkingDirectory());
 
-    LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
+    return new LogicalIOProcessorRuntimeTask(
         taskSpec,
         0,
         conf,
@@ -209,7 +264,6 @@ public class TestMROutput {
         new HashMap<String, String>(),
         HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
         Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim());
-    return task;
   }
   
   public static class TestOutputCommitter extends OutputCommitter {
@@ -282,6 +336,47 @@ public class TestMROutput {
     }
   }
 
+  // OldAPI OutputFormat class that reads the workoutput path while creating recordWriters
+  public static class OldAPI_WorkOutputPathReadingOutputFormat extends org.apache.hadoop.mapred.FileOutputFormat<String, String> {
+    public static class NoOpRecordWriter implements org.apache.hadoop.mapred.RecordWriter<String, String> {
+      @Override
+      public void write(String key, String value) throws IOException {}
+
+      @Override
+      public void close(Reporter reporter) throws IOException {}
+    }
+
+    @Override
+    public org.apache.hadoop.mapred.RecordWriter<String, String> getRecordWriter(
+      FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
+      // check work output path is not null
+      Path workOutputPath = org.apache.hadoop.mapred.FileOutputFormat.getWorkOutputPath(job);
+      assertNotNull(workOutputPath);
+      return new NoOpRecordWriter();
+    }
+  }
+
+  // NewAPI OutputFormat class that reads the default work file while creating recordWriters
+  public static class NewAPI_WorkOutputPathReadingOutputFormat extends FileOutputFormat<String, String> {
+    public static class NoOpRecordWriter extends RecordWriter<String, String> {
+      @Override
+      public void write(String key, String value) throws IOException, InterruptedException {
+      }
+
+      @Override
+      public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+      }
+    }
+
+    @Override
+    public RecordWriter<String, String> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
+      // check default work file is not null
+      Path workOutputPath = getDefaultWorkFile(job, ".foo");
+      assertNotNull(workOutputPath);
+      return new NoOpRecordWriter();
+    }
+  }
+
   public static class TestProcessor extends SimpleProcessor {
     public TestProcessor(ProcessorContext context) {
       super(context);