You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/09/23 14:41:03 UTC
spark git commit: [SPARK-22033][CORE] BufferHolder,
other size checks should account for the specific VM array size
limitations
Repository: spark
Updated Branches:
refs/heads/master 3920af7d1 -> 50ada2a4d
[SPARK-22033][CORE] BufferHolder, other size checks should account for the specific VM array size limitations
## What changes were proposed in this pull request?
Try to avoid allocating an array bigger than Integer.MAX_VALUE - 8, which is the actual max size on some JVMs, in several places
## How was this patch tested?
Existing tests
Author: Sean Owen <so...@cloudera.com>
Closes #19266 from srowen/SPARK-22033.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50ada2a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50ada2a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50ada2a4
Branch: refs/heads/master
Commit: 50ada2a4d31609b6c828158cad8e128c2f605b8d
Parents: 3920af7
Author: Sean Owen <so...@cloudera.com>
Authored: Sat Sep 23 15:40:59 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat Sep 23 15:40:59 2017 +0100
----------------------------------------------------------------------
.../apache/spark/unsafe/array/LongArray.java | 2 +-
.../spark/unsafe/map/HashMapGrowthStrategy.java | 8 +++++++-
.../spark/util/collection/CompactBuffer.scala | 20 ++++++++++----------
.../util/collection/PartitionedPairBuffer.scala | 8 +++++---
.../expressions/codegen/BufferHolder.java | 11 ++++++++---
.../vectorized/WritableColumnVector.java | 2 +-
6 files changed, 32 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/50ada2a4/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
index 1a3cdff..2cd39bd 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
@@ -39,7 +39,7 @@ public final class LongArray {
private final long length;
public LongArray(MemoryBlock memory) {
- assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size > 4 billion elements";
+ assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements";
this.memory = memory;
this.baseObj = memory.getBaseObject();
this.baseOffset = memory.getBaseOffset();
http://git-wip-us.apache.org/repos/asf/spark/blob/50ada2a4/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java b/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java
index 20654e4..b8c2294 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java
@@ -30,11 +30,17 @@ public interface HashMapGrowthStrategy {
HashMapGrowthStrategy DOUBLING = new Doubling();
class Doubling implements HashMapGrowthStrategy {
+
+ // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
+ // smaller. Be conservative and lower the cap a little.
+ private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;
+
@Override
public int nextCapacity(int currentCapacity) {
assert (currentCapacity > 0);
+ int doubleCapacity = currentCapacity * 2;
// Guard against overflow
- return (currentCapacity * 2 > 0) ? (currentCapacity * 2) : Integer.MAX_VALUE;
+ return (doubleCapacity > 0 && doubleCapacity <= ARRAY_MAX) ? doubleCapacity : ARRAY_MAX;
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/50ada2a4/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
index 4d43d8d..f5d2fa1 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
@@ -126,22 +126,22 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable
/** Increase our size to newSize and grow the backing array if needed. */
private def growToSize(newSize: Int): Unit = {
- if (newSize < 0) {
- throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements")
+ // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
+ // smaller. Be conservative and lower the cap a little.
+ val arrayMax = Int.MaxValue - 8
+ if (newSize < 0 || newSize - 2 > arrayMax) {
+ throw new UnsupportedOperationException(s"Can't grow buffer past $arrayMax elements")
}
val capacity = if (otherElements != null) otherElements.length + 2 else 2
if (newSize > capacity) {
- var newArrayLen = 8
+ var newArrayLen = 8L
while (newSize - 2 > newArrayLen) {
newArrayLen *= 2
- if (newArrayLen == Int.MinValue) {
- // Prevent overflow if we double from 2^30 to 2^31, which will become Int.MinValue.
- // Note that we set the new array length to Int.MaxValue - 2 so that our capacity
- // calculation above still gives a positive integer.
- newArrayLen = Int.MaxValue - 2
- }
}
- val newArray = new Array[T](newArrayLen)
+ if (newArrayLen > arrayMax) {
+ newArrayLen = arrayMax
+ }
+ val newArray = new Array[T](newArrayLen.toInt)
if (otherElements != null) {
System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/50ada2a4/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
index f5844d5..b755e5d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
@@ -25,7 +25,7 @@ import org.apache.spark.util.collection.WritablePartitionedPairCollection._
* Append-only buffer of key-value pairs, each with a corresponding partition ID, that keeps track
* of its estimated size in bytes.
*
- * The buffer can support up to `1073741823 (2 ^ 30 - 1)` elements.
+ * The buffer can support up to 1073741819 elements.
*/
private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
extends WritablePartitionedPairCollection[K, V] with SizeTracker
@@ -59,7 +59,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements")
}
val newCapacity =
- if (capacity * 2 < 0 || capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
+ if (capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
MAXIMUM_CAPACITY
} else {
capacity * 2
@@ -96,5 +96,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
}
private object PartitionedPairBuffer {
- val MAXIMUM_CAPACITY = Int.MaxValue / 2 // 2 ^ 30 - 1
+ // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
+ // smaller. Be conservative and lower the cap a little.
+ val MAXIMUM_CAPACITY: Int = (Int.MaxValue - 8) / 2
}
http://git-wip-us.apache.org/repos/asf/spark/blob/50ada2a4/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
index 0e4264f..971d199 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
@@ -35,6 +35,11 @@ import org.apache.spark.unsafe.Platform;
* if the fields of row are all fixed-length, as the size of result row is also fixed.
*/
public class BufferHolder {
+
+ // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
+ // smaller. Be conservative and lower the cap a little.
+ private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;
+
public byte[] buffer;
public int cursor = Platform.BYTE_ARRAY_OFFSET;
private final UnsafeRow row;
@@ -61,15 +66,15 @@ public class BufferHolder {
* Grows the buffer by at least neededSize and points the row to the buffer.
*/
public void grow(int neededSize) {
- if (neededSize > Integer.MAX_VALUE - totalSize()) {
+ if (neededSize > ARRAY_MAX - totalSize()) {
throw new UnsupportedOperationException(
"Cannot grow BufferHolder by size " + neededSize + " because the size after growing " +
- "exceeds size limitation " + Integer.MAX_VALUE);
+ "exceeds size limitation " + ARRAY_MAX);
}
final int length = totalSize() + neededSize;
if (buffer.length < length) {
// This will not happen frequently, because the buffer is re-used.
- int newLength = length < Integer.MAX_VALUE / 2 ? length * 2 : Integer.MAX_VALUE;
+ int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
final byte[] tmp = new byte[newLength];
Platform.copyMemory(
buffer,
http://git-wip-us.apache.org/repos/asf/spark/blob/50ada2a4/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index b4f753c..0bddc35 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -559,7 +559,7 @@ public abstract class WritableColumnVector extends ColumnVector {
* Upper limit for the maximum capacity for this column.
*/
@VisibleForTesting
- protected int MAX_CAPACITY = Integer.MAX_VALUE;
+ protected int MAX_CAPACITY = Integer.MAX_VALUE - 8;
/**
* Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org