You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/08/21 23:29:43 UTC
tez git commit: TEZ-2732. DefaultSorter throws ArrayIndex exceptions
on 2047 Mb size sort buffers (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master 663ead2dc -> 2e621ed50
TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2e621ed5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2e621ed5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2e621ed5
Branch: refs/heads/master
Commit: 2e621ed5067780bb5093851afddfbd571d813fa3
Parents: 663ead2
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Sat Aug 22 03:03:29 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Sat Aug 22 03:03:29 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 4 +
.../common/sort/impl/dflt/DefaultSorter.java | 25 ++++--
.../sort/impl/dflt/TestDefaultSorter.java | 91 +++++++++++++++++++-
3 files changed, 112 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/2e621ed5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8fe9627..61b583b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
TEZ-2468. Change the minimum Java version to Java 7.
ALL CHANGES:
+ TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
TEZ-2687. ATS History shutdown happens before the min-held containers are released
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
TEZ-2730. tez-api missing dependency on org.codehaus.jettison for json.
@@ -75,6 +76,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
TEZ-2687. ATS History shutdown happens before the min-held containers are released
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
TEZ-2540. Create both tez-dist minimal and minimal.tar.gz formats as part of build
@@ -298,6 +300,7 @@ Release 0.6.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
TEZ-2687. ATS History shutdown happens before the min-held containers are released
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
TEZ-2630. TezChild receives IP address instead of FQDN.
@@ -507,6 +510,7 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
TEZ-2687. ATS History shutdown happens before the min-held containers are released
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
TEZ-2630. TezChild receives IP address instead of FQDN.
http://git-wip-us.apache.org/repos/asf/tez/blob/2e621ed5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 0e0626c..edc02f3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -120,6 +120,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
private long totalKeys = 0;
private long sameKey = 0;
+ public static final int MAX_IO_SORT_MB = 1800;
+
public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs,
long initialMemoryAvailable) throws IOException {
@@ -205,13 +207,15 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
"=" + availableMemoryMB + ". It should be > 0");
}
- if (availableMemoryMB > 2047) {
+ if (availableMemoryMB > MAX_IO_SORT_MB) {
LOG.warn("Scaling down " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB +
- "=" + availableMemoryMB + " to 2047 (max sort buffer size supported for DefaultSorter)");
+ "=" + availableMemoryMB + " to " + MAX_IO_SORT_MB
+ + " (max sort buffer size supported forDefaultSorter)");
}
- //cap sort buffer to 2047 for DefaultSorter.
- return Math.min(2047, availableMemoryMB);
+ // cap sort buffer to MAX_IO_SORT_MB for DefaultSorter.
+ // Not using 2047 to avoid any ArrayIndexOutofBounds in collect() phase.
+ return Math.min(MAX_IO_SORT_MB, availableMemoryMB);
}
@Override
@@ -277,6 +281,15 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
// leave at least half the split buffer for serialization data
// ensure that kvindex >= bufindex
final int distkvi = distanceTo(bufindex, kvbidx);
+ /**
+ * Reason for capping sort buffer to MAX_IO_SORT_MB
+ * E.g
+ * kvbuffer.length = 2146435072 (2047 MB)
+ * Corner case: bufIndex=2026133899, kvbidx=523629312.
+ * distkvi = mod - i + j = 2146435072 - 2026133899 + 523629312 = 643930485
+ * newPos = (2026133899 + (max(.., min(643930485/2, 271128624))) (This would
+ * overflow)
+ */
final int newPos = (bufindex +
Math.max(2 * METASIZE - 1,
Math.min(distkvi / 2,
@@ -608,7 +621,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
}
}
// here, we know that we have sufficient space to write
- if (bufindex + len > bufvoid) {
+ // int overflow possible with (bufindex + len)
+ long futureBufIndex = (long) bufindex + len;
+ if (futureBufIndex > bufvoid) {
final int gaplen = bufvoid - bufindex;
System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
len -= gaplen;
http://git-wip-us.apache.org/repos/asf/tez/blob/2e621ed5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index 2fb3986..c22e605 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -31,14 +31,18 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import com.google.protobuf.ByteString;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.StringInterner;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
@@ -62,6 +66,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
@@ -127,11 +132,91 @@ public class TestDefaultSorter {
}
}
+
+ @Test
+ @Ignore
+ /**
+ * Disabling this, as this would need 2047 MB sort mb for testing.
+ * Set DefaultSorter.MAX_IO_SORT_MB = 20467 for running this.
+ */
+ public void testSortLimitsWithSmallRecord() throws IOException {
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, NullWritable.class.getName());
+ OutputContext context = createTezOutputContext();
+
+ doReturn(2800 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask();
+
+ //Setting IO_SORT_MB to 2047 MB
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 2047);
+ context.requestInitialMemory(
+ ExternalSorter.getInitialMemoryRequirement(conf,
+ context.getTotalMemoryAvailableToTask()), new MemoryUpdateCallbackHandler());
+
+ DefaultSorter sorter = new DefaultSorter(context, conf, 2, 2047 << 20);
+
+ //Reset key/value in conf back to Text for other test cases
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+
+ int i = 0;
+ /**
+ * If io.sort.mb is not capped to 1800, this would end up throwing
+ * "java.lang.ArrayIndexOutOfBoundsException" after many spills.
+ * Intentionally made it as infinite loop.
+ */
+ while (true) {
+ //test for the avg record size 2 (in lower spectrum)
+ Text key = new Text(i + "");
+ sorter.write(key, NullWritable.get());
+ i = (i + 1) % 10;
+ }
+ }
+
+ @Test
+ @Ignore
+ /**
+ * Disabling this, as this would need 2047 MB io.sort.mb for testing.
+ * Provide > 2GB to JVM when running this test to avoid OOM in string generation.
+ *
+ * Set DefaultSorter.MAX_IO_SORT_MB = 2047 for running this.
+ */
+ public void testSortLimitsWithLargeRecords() throws IOException {
+ OutputContext context = createTezOutputContext();
+
+ doReturn(2800 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask();
+
+ //Setting IO_SORT_MB to 2047 MB
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 2047);
+ context.requestInitialMemory(
+ ExternalSorter.getInitialMemoryRequirement(conf,
+ context.getTotalMemoryAvailableToTask()), new MemoryUpdateCallbackHandler());
+
+ DefaultSorter sorter = new DefaultSorter(context, conf, 2, 2047 << 20);
+
+ int i = 0;
+ /**
+ * If io.sort.mb is not capped to 1800, this would end up throwing
+ * "java.lang.ArrayIndexOutOfBoundsException" after many spills.
+ * Intentionally made it as infinite loop.
+ */
+ while (true) {
+ Text key = new Text(i + "");
+ //Generate random size between 1 MB to 100 MB.
+ int valSize = ThreadLocalRandom.current().nextInt(1 * 1024 * 1024, 100 * 1024 * 1024);
+ String val = StringInterner.weakIntern(StringUtils.repeat("v", valSize));
+ sorter.write(key, new Text(val));
+ i = (i + 1) % 10;
+ }
+ }
+
+
@Test(timeout = 5000)
public void testSortMBLimits() throws Exception {
- assertTrue("Expected 2047", DefaultSorter.computeSortBufferSize(4096) == 2047);
- assertTrue("Expected 2047", DefaultSorter.computeSortBufferSize(2047) == 2047);
+ assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB,
+ DefaultSorter.computeSortBufferSize(4096) == DefaultSorter.MAX_IO_SORT_MB);
+ assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB,
+ DefaultSorter.computeSortBufferSize(2047) == DefaultSorter.MAX_IO_SORT_MB);
assertTrue("Expected 1024", DefaultSorter.computeSortBufferSize(1024) == 1024);
try {
@@ -166,7 +251,7 @@ public class TestDefaultSorter {
conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1);
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
- context.getTotalMemoryAvailableToTask()), handler);
+ context.getTotalMemoryAvailableToTask()), handler);
DefaultSorter sorter = new DefaultSorter(context, conf, 5, handler.getMemoryAssigned());
//Write 1000 keys each of size 1000, (> 1 spill should happen)