You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/09/23 14:41:03 UTC

spark git commit: [SPARK-22033][CORE] BufferHolder, other size checks should account for the specific VM array size limitations

Repository: spark
Updated Branches:
  refs/heads/master 3920af7d1 -> 50ada2a4d


[SPARK-22033][CORE] BufferHolder, other size checks should account for the specific VM array size limitations

## What changes were proposed in this pull request?

Try to avoid allocating an array bigger than Integer.MAX_VALUE - 8, which is the actual max size on some JVMs, in several places

## How was this patch tested?

Existing tests

Author: Sean Owen <so...@cloudera.com>

Closes #19266 from srowen/SPARK-22033.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50ada2a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50ada2a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50ada2a4

Branch: refs/heads/master
Commit: 50ada2a4d31609b6c828158cad8e128c2f605b8d
Parents: 3920af7
Author: Sean Owen <so...@cloudera.com>
Authored: Sat Sep 23 15:40:59 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat Sep 23 15:40:59 2017 +0100

----------------------------------------------------------------------
 .../apache/spark/unsafe/array/LongArray.java    |  2 +-
 .../spark/unsafe/map/HashMapGrowthStrategy.java |  8 +++++++-
 .../spark/util/collection/CompactBuffer.scala   | 20 ++++++++++----------
 .../util/collection/PartitionedPairBuffer.scala |  8 +++++---
 .../expressions/codegen/BufferHolder.java       | 11 ++++++++---
 .../vectorized/WritableColumnVector.java        |  2 +-
 6 files changed, 32 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/50ada2a4/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
index 1a3cdff..2cd39bd 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
@@ -39,7 +39,7 @@ public final class LongArray {
   private final long length;
 
   public LongArray(MemoryBlock memory) {
-    assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size > 4 billion elements";
+    assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements";
     this.memory = memory;
     this.baseObj = memory.getBaseObject();
     this.baseOffset = memory.getBaseOffset();

http://git-wip-us.apache.org/repos/asf/spark/blob/50ada2a4/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java b/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java
index 20654e4..b8c2294 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java
@@ -30,11 +30,17 @@ public interface HashMapGrowthStrategy {
   HashMapGrowthStrategy DOUBLING = new Doubling();
 
   class Doubling implements HashMapGrowthStrategy {
+
+    // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
+    // smaller. Be conservative and lower the cap a little.
+    private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;
+
     @Override
     public int nextCapacity(int currentCapacity) {
       assert (currentCapacity > 0);
+      int doubleCapacity = currentCapacity * 2;
       // Guard against overflow
-      return (currentCapacity * 2 > 0) ? (currentCapacity * 2) : Integer.MAX_VALUE;
+      return (doubleCapacity > 0 && doubleCapacity <= ARRAY_MAX) ? doubleCapacity : ARRAY_MAX;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/50ada2a4/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
index 4d43d8d..f5d2fa1 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
@@ -126,22 +126,22 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable
 
   /** Increase our size to newSize and grow the backing array if needed. */
   private def growToSize(newSize: Int): Unit = {
-    if (newSize < 0) {
-      throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements")
+    // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
+    // smaller. Be conservative and lower the cap a little.
+    val arrayMax = Int.MaxValue - 8
+    if (newSize < 0 || newSize - 2 > arrayMax) {
+      throw new UnsupportedOperationException(s"Can't grow buffer past $arrayMax elements")
     }
     val capacity = if (otherElements != null) otherElements.length + 2 else 2
     if (newSize > capacity) {
-      var newArrayLen = 8
+      var newArrayLen = 8L
       while (newSize - 2 > newArrayLen) {
         newArrayLen *= 2
-        if (newArrayLen == Int.MinValue) {
-          // Prevent overflow if we double from 2^30 to 2^31, which will become Int.MinValue.
-          // Note that we set the new array length to Int.MaxValue - 2 so that our capacity
-          // calculation above still gives a positive integer.
-          newArrayLen = Int.MaxValue - 2
-        }
       }
-      val newArray = new Array[T](newArrayLen)
+      if (newArrayLen > arrayMax) {
+        newArrayLen = arrayMax
+      }
+      val newArray = new Array[T](newArrayLen.toInt)
       if (otherElements != null) {
         System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/50ada2a4/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
index f5844d5..b755e5d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
@@ -25,7 +25,7 @@ import org.apache.spark.util.collection.WritablePartitionedPairCollection._
  * Append-only buffer of key-value pairs, each with a corresponding partition ID, that keeps track
  * of its estimated size in bytes.
  *
- * The buffer can support up to `1073741823 (2 ^ 30 - 1)` elements.
+ * The buffer can support up to 1073741819 elements.
  */
 private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
   extends WritablePartitionedPairCollection[K, V] with SizeTracker
@@ -59,7 +59,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
       throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements")
     }
     val newCapacity =
-      if (capacity * 2 < 0 || capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
+      if (capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
         MAXIMUM_CAPACITY
       } else {
         capacity * 2
@@ -96,5 +96,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
 }
 
 private object PartitionedPairBuffer {
-  val MAXIMUM_CAPACITY = Int.MaxValue / 2 // 2 ^ 30 - 1
+  // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
+  // smaller. Be conservative and lower the cap a little.
+  val MAXIMUM_CAPACITY: Int = (Int.MaxValue - 8) / 2
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/50ada2a4/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
index 0e4264f..971d199 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
@@ -35,6 +35,11 @@ import org.apache.spark.unsafe.Platform;
  * if the fields of row are all fixed-length, as the size of result row is also fixed.
  */
 public class BufferHolder {
+
+  // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
+  // smaller. Be conservative and lower the cap a little.
+  private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;
+
   public byte[] buffer;
   public int cursor = Platform.BYTE_ARRAY_OFFSET;
   private final UnsafeRow row;
@@ -61,15 +66,15 @@ public class BufferHolder {
    * Grows the buffer by at least neededSize and points the row to the buffer.
    */
   public void grow(int neededSize) {
-    if (neededSize > Integer.MAX_VALUE - totalSize()) {
+    if (neededSize > ARRAY_MAX - totalSize()) {
       throw new UnsupportedOperationException(
         "Cannot grow BufferHolder by size " + neededSize + " because the size after growing " +
-          "exceeds size limitation " + Integer.MAX_VALUE);
+          "exceeds size limitation " + ARRAY_MAX);
     }
     final int length = totalSize() + neededSize;
     if (buffer.length < length) {
       // This will not happen frequently, because the buffer is re-used.
-      int newLength = length < Integer.MAX_VALUE / 2 ? length * 2 : Integer.MAX_VALUE;
+      int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
       final byte[] tmp = new byte[newLength];
       Platform.copyMemory(
         buffer,

http://git-wip-us.apache.org/repos/asf/spark/blob/50ada2a4/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index b4f753c..0bddc35 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -559,7 +559,7 @@ public abstract class WritableColumnVector extends ColumnVector {
    * Upper limit for the maximum capacity for this column.
    */
   @VisibleForTesting
-  protected int MAX_CAPACITY = Integer.MAX_VALUE;
+  protected int MAX_CAPACITY = Integer.MAX_VALUE - 8;
 
   /**
    * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org