You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/08/21 23:29:43 UTC

tez git commit: TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master 663ead2dc -> 2e621ed50


TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2e621ed5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2e621ed5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2e621ed5

Branch: refs/heads/master
Commit: 2e621ed5067780bb5093851afddfbd571d813fa3
Parents: 663ead2
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Sat Aug 22 03:03:29 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Sat Aug 22 03:03:29 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 +
 .../common/sort/impl/dflt/DefaultSorter.java    | 25 ++++--
 .../sort/impl/dflt/TestDefaultSorter.java       | 91 +++++++++++++++++++-
 3 files changed, 112 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2e621ed5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8fe9627..61b583b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
   TEZ-2687. ATS History shutdown happens before the min-held containers are released
   TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
   TEZ-2730. tez-api missing dependency on org.codehaus.jettison for json.
@@ -75,6 +76,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
   TEZ-2687. ATS History shutdown happens before the min-held containers are released
   TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
   TEZ-2540. Create both tez-dist minimal and minimal.tar.gz formats as part of build
@@ -298,6 +300,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
   TEZ-2687. ATS History shutdown happens before the min-held containers are released
   TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
   TEZ-2630. TezChild receives IP address instead of FQDN.
@@ -507,6 +510,7 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
   TEZ-2687. ATS History shutdown happens before the min-held containers are released
   TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
   TEZ-2630. TezChild receives IP address instead of FQDN.

http://git-wip-us.apache.org/repos/asf/tez/blob/2e621ed5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 0e0626c..edc02f3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -120,6 +120,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   private long totalKeys = 0;
   private long sameKey = 0;
 
+  public static final int MAX_IO_SORT_MB = 1800;
+
 
   public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs,
       long initialMemoryAvailable) throws IOException {
@@ -205,13 +207,15 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           "=" + availableMemoryMB + ". It should be > 0");
     }
 
-    if (availableMemoryMB > 2047) {
+    if (availableMemoryMB > MAX_IO_SORT_MB) {
       LOG.warn("Scaling down " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB +
-          "=" + availableMemoryMB + " to 2047 (max sort buffer size supported for DefaultSorter)");
+          "=" + availableMemoryMB + " to " + MAX_IO_SORT_MB
+          + " (max sort buffer size supported forDefaultSorter)");
     }
 
-    //cap sort buffer to 2047 for DefaultSorter.
-    return Math.min(2047, availableMemoryMB);
+    // cap sort buffer to MAX_IO_SORT_MB for DefaultSorter.
+    // Not using 2047 to avoid any ArrayIndexOutofBounds in collect() phase.
+    return Math.min(MAX_IO_SORT_MB, availableMemoryMB);
   }
 
   @Override
@@ -277,6 +281,15 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
               // leave at least half the split buffer for serialization data
               // ensure that kvindex >= bufindex
               final int distkvi = distanceTo(bufindex, kvbidx);
+              /**
+               * Reason for capping sort buffer to MAX_IO_SORT_MB
+               * E.g
+               * kvbuffer.length = 2146435072 (2047 MB)
+               * Corner case: bufIndex=2026133899, kvbidx=523629312.
+               * distkvi = mod - i + j = 2146435072 - 2026133899 + 523629312 = 643930485
+               * newPos = (2026133899 + (max(.., min(643930485/2, 271128624))) (This would
+               * overflow)
+               */
               final int newPos = (bufindex +
                 Math.max(2 * METASIZE - 1,
                         Math.min(distkvi / 2,
@@ -608,7 +621,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         }
       }
       // here, we know that we have sufficient space to write
-      if (bufindex + len > bufvoid) {
+      // int overflow possible with (bufindex + len)
+      long futureBufIndex = (long) bufindex + len;
+      if (futureBufIndex > bufvoid) {
         final int gaplen = bufvoid - bufindex;
         System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
         len -= gaplen;

http://git-wip-us.apache.org/repos/asf/tez/blob/2e621ed5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index 2fb3986..c22e605 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -31,14 +31,18 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 
 import com.google.protobuf.ByteString;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
@@ -62,6 +66,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
@@ -127,11 +132,91 @@ public class TestDefaultSorter {
     }
   }
 
+
+  @Test
+  @Ignore
+  /**
+   * Disabling this, as this would need 2047 MB sort mb for testing.
+   * Set DefaultSorter.MAX_IO_SORT_MB = 20467 for running this.
+   */
+  public void testSortLimitsWithSmallRecord() throws IOException {
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, NullWritable.class.getName());
+    OutputContext context = createTezOutputContext();
+
+    doReturn(2800 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask();
+
+    //Setting IO_SORT_MB to 2047 MB
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 2047);
+    context.requestInitialMemory(
+        ExternalSorter.getInitialMemoryRequirement(conf,
+            context.getTotalMemoryAvailableToTask()), new MemoryUpdateCallbackHandler());
+
+    DefaultSorter sorter = new DefaultSorter(context, conf, 2, 2047 << 20);
+
+    //Reset key/value in conf back to Text for other test cases
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+
+    int i = 0;
+    /**
+     * If io.sort.mb is not capped to 1800, this would end up throwing
+     * "java.lang.ArrayIndexOutOfBoundsException" after many spills.
+     * Intentionally made it as infinite loop.
+     */
+    while (true) {
+      //test for the avg record size 2 (in lower spectrum)
+      Text key = new Text(i + "");
+      sorter.write(key, NullWritable.get());
+      i = (i + 1) % 10;
+    }
+  }
+
+  @Test
+  @Ignore
+  /**
+   * Disabling this, as this would need 2047 MB io.sort.mb for testing.
+   * Provide > 2GB to JVM when running this test to avoid OOM in string generation.
+   *
+   * Set DefaultSorter.MAX_IO_SORT_MB = 2047 for running this.
+   */
+  public void testSortLimitsWithLargeRecords() throws IOException {
+    OutputContext context = createTezOutputContext();
+
+    doReturn(2800 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask();
+
+    //Setting IO_SORT_MB to 2047 MB
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 2047);
+    context.requestInitialMemory(
+        ExternalSorter.getInitialMemoryRequirement(conf,
+            context.getTotalMemoryAvailableToTask()), new MemoryUpdateCallbackHandler());
+
+    DefaultSorter sorter = new DefaultSorter(context, conf, 2, 2047 << 20);
+
+    int i = 0;
+    /**
+     * If io.sort.mb is not capped to 1800, this would end up throwing
+     * "java.lang.ArrayIndexOutOfBoundsException" after many spills.
+     * Intentionally made it as infinite loop.
+     */
+    while (true) {
+      Text key = new Text(i + "");
+      //Generate random size between 1 MB to 100 MB.
+      int valSize = ThreadLocalRandom.current().nextInt(1 * 1024 * 1024, 100 * 1024 * 1024);
+      String val = StringInterner.weakIntern(StringUtils.repeat("v", valSize));
+      sorter.write(key, new Text(val));
+      i = (i + 1) % 10;
+    }
+  }
+
+
   @Test(timeout = 5000)
   public void testSortMBLimits() throws Exception {
 
-    assertTrue("Expected 2047", DefaultSorter.computeSortBufferSize(4096) == 2047);
-    assertTrue("Expected 2047", DefaultSorter.computeSortBufferSize(2047) == 2047);
+    assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB,
+        DefaultSorter.computeSortBufferSize(4096) == DefaultSorter.MAX_IO_SORT_MB);
+    assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB,
+        DefaultSorter.computeSortBufferSize(2047) == DefaultSorter.MAX_IO_SORT_MB);
     assertTrue("Expected 1024", DefaultSorter.computeSortBufferSize(1024) == 1024);
 
     try {
@@ -166,7 +251,7 @@ public class TestDefaultSorter {
 
     conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1);
     context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
-            context.getTotalMemoryAvailableToTask()), handler);
+        context.getTotalMemoryAvailableToTask()), handler);
     DefaultSorter sorter = new DefaultSorter(context, conf, 5, handler.getMemoryAssigned());
 
     //Write 1000 keys each of size 1000, (> 1 spill should happen)