You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/05/04 09:08:48 UTC
tez git commit: TEZ-2405. PipelinedSorter can throw NPE with custom
compartor (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master f6ea0fb33 -> c411e4edc
TEZ-2405. PipelinedSorter can throw NPE with custom compartor (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c411e4ed
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c411e4ed
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c411e4ed
Branch: refs/heads/master
Commit: c411e4edced690d111dac3cf2afcbb6cd39354f4
Parents: f6ea0fb
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Mon May 4 12:38:34 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Mon May 4 12:38:34 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../common/sort/impl/PipelinedSorter.java | 2 +-
.../common/sort/impl/TestPipelinedSorter.java | 53 +++++++++++++++++---
3 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/c411e4ed/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8108ac8..6c19770 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
ALL CHANGES:
+ TEZ-2405. PipelinedSorter can throw NPE with custom compartor.
TEZ-1897. Create a concurrent version of AsyncDispatcher
TEZ-2394. Issues when there is an error in VertexManager callbacks
TEZ-2386. Tez UI: Inconsistent usage of icon colors
http://git-wip-us.apache.org/repos/asf/tez/blob/c411e4ed/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 65606bf..661f54c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -749,7 +749,7 @@ public class PipelinedSorter extends ExternalSorter {
cmp = comparator.compare(buf,
keystart + off , (valstart - keystart),
needle.getData(),
- needle.getPosition(), needle.getLength());
+ needle.getPosition(), (needle.getLength() - needle.getPosition()));
}
return cmp;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/c411e4ed/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 6e56567..5de96c9 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -8,6 +8,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -22,6 +23,7 @@ import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -61,10 +63,10 @@ public class TestPipelinedSorter {
private static final Configuration conf = new Configuration();
private static FileSystem localFs = null;
private static Path workDir = null;
+ private OutputContext outputContext;
private int numOutputs;
private long initialAvailableMem;
- private OutputContext outputContext;
//TODO: Need to make it nested structure so that multiple partition cases can be validated
private static TreeMap<String, String> sortedDataMap = Maps.newTreeMap();
@@ -82,6 +84,11 @@ public class TestPipelinedSorter {
}
}
+ @AfterClass
+ public static void cleanup() throws IOException {
+ localFs.delete(workDir, true);
+ }
+
@Before
public void setup() throws IOException {
ApplicationId appId = ApplicationId.newInstance(10000, 1);
@@ -89,12 +96,14 @@ public class TestPipelinedSorter {
String uniqueId = UUID.randomUUID().toString();
this.outputContext = createMockOutputContext(counters, appId, uniqueId);
- //To enable PipelinedSorter, set 2 threads
+ //To enable PipelinedSorter
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.PIPELINED.name());
+
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
- conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
- HashPartitioner.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, HashPartitioner.class.getName());
+
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
//Setup localdirs
String localDirs = workDir.toString();
@@ -102,9 +111,8 @@ public class TestPipelinedSorter {
}
@After
- public void cleanup() throws IOException {
- localFs.delete(workDir, true);
- sortedDataMap.clear();
+ public void reset() throws IOException {
+ cleanup();
localFs.mkdirs(workDir);
}
@@ -133,6 +141,13 @@ public class TestPipelinedSorter {
}
@Test
+ public void testWithCustomComparator() throws IOException {
+ //Test with custom comparator
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, CustomComparator.class.getName());
+ basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20);
+ }
+
+ @Test
public void testWithPipelinedShuffle() throws IOException {
this.numOutputs = 1;
this.initialAvailableMem = 5 *1024 * 1024;
@@ -251,7 +266,7 @@ public class TestPipelinedSorter {
Assert.assertTrue(numRecordsRead == sortedDataMap.size());
}
- private OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId,
+ private static OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId,
String uniqueId) throws IOException {
OutputContext outputContext = mock(OutputContext.class);
@@ -280,4 +295,26 @@ public class TestPipelinedSorter {
doReturn(outDirs).when(outputContext).getWorkDirs();
return outputContext;
}
+
+ /**
+ * E.g Hive uses TezBytesComparator which internally makes use of WritableComparator's comparison.
+ * Any length mismatches are handled there.
+ *
+ * However, custom comparators can handle this differently and might throw
+ * IndexOutOfBoundsException in case of invalid lengths.
+ *
+ * This comparator (similar to comparator in BinInterSedes of pig) would thrown exception when
+ * wrong lengths are mentioned.
+ */
+ public static class CustomComparator extends WritableComparator {
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ //wrapping is done so that it would throw exceptions on wrong lengths
+ ByteBuffer bb1 = ByteBuffer.wrap(b1, s1, l1);
+ ByteBuffer bb2 = ByteBuffer.wrap(b2, s2, l2);
+
+ return bb1.compareTo(bb2);
+ }
+
+ }
}