You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/05/24 03:18:17 UTC
spark git commit: [SPARK-24257][SQL] LongToUnsafeRowMap calculate the
new size may be wrong
Repository: spark
Updated Branches:
refs/heads/master 230f14419 -> 888340151
[SPARK-24257][SQL] LongToUnsafeRowMap calculate the new size may be wrong
LongToUnsafeRowMap has a mistake when growing its page array: it blindly grows to `oldSize * 2`, while the new record may be larger than `oldSize * 2`. Then we may have a malformed UnsafeRow when querying this map, whose actual data is smaller than its declared size, and the data is corrupted.
Author: sychen <sy...@ctrip.com>
Closes #21311 from cxzl25/fix_LongToUnsafeRowMap_page_size.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88834015
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88834015
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88834015
Branch: refs/heads/master
Commit: 888340151f737bb68d0e419b1e949f11469881f9
Parents: 230f144
Author: sychen <sy...@ctrip.com>
Authored: Thu May 24 11:02:09 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu May 24 11:18:07 2018 +0800
----------------------------------------------------------------------
.../sql/execution/joins/HashedRelation.scala | 38 ++++++++++++--------
.../execution/joins/HashedRelationSuite.scala | 26 +++++++++++++-
2 files changed, 48 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/88834015/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 1465346..20ce01f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -557,7 +557,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
def append(key: Long, row: UnsafeRow): Unit = {
val sizeInBytes = row.getSizeInBytes
if (sizeInBytes >= (1 << SIZE_BITS)) {
- sys.error("Does not support row that is larger than 256M")
+ throw new UnsupportedOperationException("Does not support row that is larger than 256M")
}
if (key < minKey) {
@@ -567,19 +567,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
maxKey = key
}
- // There is 8 bytes for the pointer to next value
- if (cursor + 8 + row.getSizeInBytes > page.length * 8L + Platform.LONG_ARRAY_OFFSET) {
- val used = page.length
- if (used >= (1 << 30)) {
- sys.error("Can not build a HashedRelation that is larger than 8G")
- }
- ensureAcquireMemory(used * 8L * 2)
- val newPage = new Array[Long](used * 2)
- Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET,
- cursor - Platform.LONG_ARRAY_OFFSET)
- page = newPage
- freeMemory(used * 8L)
- }
+ grow(row.getSizeInBytes)
// copy the bytes of UnsafeRow
val offset = cursor
@@ -615,7 +603,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
growArray()
} else if (numKeys > array.length / 2 * 0.75) {
// The fill ratio should be less than 0.75
- sys.error("Cannot build HashedRelation with more than 1/3 billions unique keys")
+ throw new UnsupportedOperationException(
+ "Cannot build HashedRelation with more than 1/3 billions unique keys")
}
}
} else {
@@ -626,6 +615,25 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
}
}
+ private def grow(inputRowSize: Int): Unit = {
+ // There is 8 bytes for the pointer to next value
+ val neededNumWords = (cursor - Platform.LONG_ARRAY_OFFSET + 8 + inputRowSize + 7) / 8
+ if (neededNumWords > page.length) {
+ if (neededNumWords > (1 << 30)) {
+ throw new UnsupportedOperationException(
+ "Can not build a HashedRelation that is larger than 8G")
+ }
+ val newNumWords = math.max(neededNumWords, math.min(page.length * 2, 1 << 30))
+ ensureAcquireMemory(newNumWords * 8L)
+ val newPage = new Array[Long](newNumWords.toInt)
+ Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET,
+ cursor - Platform.LONG_ARRAY_OFFSET)
+ val used = page.length
+ page = newPage
+ freeMemory(used * 8L)
+ }
+ }
+
private def growArray(): Unit = {
var old_array = array
val n = array.length
http://git-wip-us.apache.org/repos/asf/spark/blob/88834015/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 51f8c33..037cc2e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
+import org.apache.spark.sql.types._
import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.collection.CompactBuffer
@@ -254,6 +254,30 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
map.free()
}
+ test("SPARK-24257: insert big values into LongToUnsafeRowMap") {
+ val taskMemoryManager = new TaskMemoryManager(
+ new StaticMemoryManager(
+ new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
+ Long.MaxValue,
+ Long.MaxValue,
+ 1),
+ 0)
+ val unsafeProj = UnsafeProjection.create(Array[DataType](StringType))
+ val map = new LongToUnsafeRowMap(taskMemoryManager, 1)
+
+ val key = 0L
+ // the page array is initialized with length 1 << 17 (1M bytes),
+ // so here we need a value larger than 1 << 18 (2M bytes), to trigger the bug
+ val bigStr = UTF8String.fromString("x" * (1 << 19))
+
+ map.append(key, unsafeProj(InternalRow(bigStr)))
+ map.optimize()
+
+ val resultRow = new UnsafeRow(1)
+ assert(map.getValue(key, resultRow).getUTF8String(0) === bigStr)
+ map.free()
+ }
+
test("Spark-14521") {
val ser = new KryoSerializer(
(new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org