You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/05/04 09:08:48 UTC

tez git commit: TEZ-2405. PipelinedSorter can throw NPE with custom compartor (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master f6ea0fb33 -> c411e4edc


TEZ-2405. PipelinedSorter can throw NPE with custom compartor (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c411e4ed
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c411e4ed
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c411e4ed

Branch: refs/heads/master
Commit: c411e4edced690d111dac3cf2afcbb6cd39354f4
Parents: f6ea0fb
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Mon May 4 12:38:34 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Mon May 4 12:38:34 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../common/sort/impl/PipelinedSorter.java       |  2 +-
 .../common/sort/impl/TestPipelinedSorter.java   | 53 +++++++++++++++++---
 3 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c411e4ed/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8108ac8..6c19770 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2405. PipelinedSorter can throw NPE with custom compartor.
   TEZ-1897. Create a concurrent version of AsyncDispatcher
   TEZ-2394. Issues when there is an error in VertexManager callbacks
   TEZ-2386. Tez UI: Inconsistent usage of icon colors

http://git-wip-us.apache.org/repos/asf/tez/blob/c411e4ed/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 65606bf..661f54c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -749,7 +749,7 @@ public class PipelinedSorter extends ExternalSorter {
         cmp = comparator.compare(buf,
             keystart + off , (valstart - keystart),
             needle.getData(),
-            needle.getPosition(), needle.getLength());
+            needle.getPosition(), (needle.getLength() - needle.getPosition()));
       }
       return cmp;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/c411e4ed/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 6e56567..5de96c9 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -8,6 +8,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -22,6 +23,7 @@ import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -61,10 +63,10 @@ public class TestPipelinedSorter {
   private static final Configuration conf = new Configuration();
   private static FileSystem localFs = null;
   private static Path workDir = null;
+  private OutputContext outputContext;
 
   private int numOutputs;
   private long initialAvailableMem;
-  private OutputContext outputContext;
 
   //TODO: Need to make it nested structure so that multiple partition cases can be validated
   private static TreeMap<String, String> sortedDataMap = Maps.newTreeMap();
@@ -82,6 +84,11 @@ public class TestPipelinedSorter {
     }
   }
 
+  @AfterClass
+  public static void cleanup() throws IOException {
+    localFs.delete(workDir, true);
+  }
+
   @Before
   public void setup() throws IOException {
     ApplicationId appId = ApplicationId.newInstance(10000, 1);
@@ -89,12 +96,14 @@ public class TestPipelinedSorter {
     String uniqueId = UUID.randomUUID().toString();
     this.outputContext = createMockOutputContext(counters, appId, uniqueId);
 
-    //To enable PipelinedSorter, set 2 threads
+    //To enable PipelinedSorter
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.PIPELINED.name());
+
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
-    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
-        HashPartitioner.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, HashPartitioner.class.getName());
+
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
 
     //Setup localdirs
     String localDirs = workDir.toString();
@@ -102,9 +111,8 @@ public class TestPipelinedSorter {
   }
 
   @After
-  public void cleanup() throws IOException {
-    localFs.delete(workDir, true);
-    sortedDataMap.clear();
+  public void reset() throws IOException {
+    cleanup();
     localFs.mkdirs(workDir);
   }
 
@@ -133,6 +141,13 @@ public class TestPipelinedSorter {
   }
 
   @Test
+  public void testWithCustomComparator() throws IOException {
+    //Test with custom comparator
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, CustomComparator.class.getName());
+    basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20);
+  }
+
+  @Test
   public void testWithPipelinedShuffle() throws IOException {
     this.numOutputs = 1;
     this.initialAvailableMem = 5 *1024 * 1024;
@@ -251,7 +266,7 @@ public class TestPipelinedSorter {
     Assert.assertTrue(numRecordsRead == sortedDataMap.size());
   }
 
-  private OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId,
+  private static OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId,
       String uniqueId) throws IOException {
     OutputContext outputContext = mock(OutputContext.class);
 
@@ -280,4 +295,26 @@ public class TestPipelinedSorter {
     doReturn(outDirs).when(outputContext).getWorkDirs();
     return outputContext;
   }
+
+  /**
+   * E.g Hive uses TezBytesComparator which internally makes use of WritableComparator's comparison.
+   * Any length mismatches are handled there.
+   *
+   * However, custom comparators can handle this differently and might throw
+   * IndexOutOfBoundsException in case of invalid lengths.
+   *
+   * This comparator (similar to comparator in BinInterSedes of pig) would thrown exception when
+   * wrong lengths are mentioned.
+   */
+  public static class CustomComparator extends WritableComparator {
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      //wrapping is done so that it would throw exceptions on wrong lengths
+      ByteBuffer bb1 = ByteBuffer.wrap(b1, s1, l1);
+      ByteBuffer bb2 = ByteBuffer.wrap(b2, s2, l2);
+
+      return bb1.compareTo(bb2);
+    }
+
+  }
 }