You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/04/08 18:40:32 UTC
spark git commit: [SPARK-23893][CORE][SQL] Avoid possible integer
overflow in multiplication
Repository: spark
Updated Branches:
refs/heads/master 710a68cec -> 8d40a79a0
[SPARK-23893][CORE][SQL] Avoid possible integer overflow in multiplication
## What changes were proposed in this pull request?
This PR avoids possible overflow at an operation `long = (long)(int * int)`. The multiplication of large positive integer values may set one to MSB. This leads to a negative value in long while we expected a positive value (e.g. `0111_0000_0000_0000 * 0000_0000_0000_0010`).
This PR performs long cast before the multiplication to avoid this situation.
## How was this patch tested?
Existing UTs
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Closes #21002 from kiszk/SPARK-23893.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d40a79a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d40a79a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d40a79a
Branch: refs/heads/master
Commit: 8d40a79a077a30024a8ef921781b68f6f7e542d1
Parents: 710a68c
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Authored: Sun Apr 8 20:40:27 2018 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Sun Apr 8 20:40:27 2018 +0200
----------------------------------------------------------------------
.../spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java | 2 +-
.../spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java | 2 +-
core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala | 2 +-
.../test/scala/org/apache/spark/InternalAccumulatorSuite.scala | 2 +-
.../org/apache/spark/deploy/history/FsHistoryProviderSuite.scala | 4 ++--
.../src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 4 ++--
.../src/test/scala/org/apache/spark/sql/HashBenchmark.scala | 2 +-
.../test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala | 3 ++-
.../scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala | 2 +-
.../columnar/compression/CompressionSchemeBenchmark.scala | 4 ++--
.../spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala | 2 +-
11 files changed, 15 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 20a7a8b..717823e 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -124,7 +124,7 @@ public final class UnsafeInMemorySorter {
int initialSize,
boolean canUseRadixSort) {
this(consumer, memoryManager, recordComparator, prefixComparator,
- consumer.allocateArray(initialSize * 2), canUseRadixSort);
+ consumer.allocateArray(initialSize * 2L), canUseRadixSort);
}
public UnsafeInMemorySorter(
http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
index d9f84d1..37772f4 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
@@ -84,7 +84,7 @@ public final class UnsafeSortDataFormat
@Override
public LongArray allocate(int length) {
- assert (length * 2 <= buffer.size()) :
+ assert (length * 2L <= buffer.size()) :
"the buffer is smaller than required: " + buffer.size() + " < " + (length * 2);
return buffer;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index c9ed12f..13db498 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -90,7 +90,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
// by 50%. We also cap the estimation in the end.
if (results.size == 0) {
- numPartsToTry = partsScanned * 4
+ numPartsToTry = partsScanned * 4L
} else {
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max(1,
http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index 8d7be77..62824a5 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -135,7 +135,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
// This job runs 2 stages, and we're in the second stage. Therefore, any task attempt
// ID that's < 2 * numPartitions belongs to the first attempt of this stage.
val taskContext = TaskContext.get()
- val isFirstStageAttempt = taskContext.taskAttemptId() < numPartitions * 2
+ val isFirstStageAttempt = taskContext.taskAttemptId() < numPartitions * 2L
if (isFirstStageAttempt) {
throw new FetchFailedException(
SparkEnv.get.blockManager.blockManagerId,
http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index fde5f25..0ba57bf 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -382,8 +382,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false)
writeFile(log, true, None,
SparkListenerApplicationStart(
- "downloadApp1", Some("downloadApp1"), 5000 * i, "test", Some(s"attempt$i")),
- SparkListenerApplicationEnd(5001 * i)
+ "downloadApp1", Some("downloadApp1"), 5000L * i, "test", Some(s"attempt$i")),
+ SparkListenerApplicationEnd(5001L * i)
)
log
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 4abbb8e..74b72d9 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -317,7 +317,7 @@ class JsonProtocolSuite extends SparkFunSuite {
test("SparkListenerJobStart backward compatibility") {
// Prior to Spark 1.2.0, SparkListenerJobStart did not have a "Stage Infos" property.
val stageIds = Seq[Int](1, 2, 3, 4)
- val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400, x * 500))
+ val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400L, x * 500L))
val dummyStageInfos =
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown"))
val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
@@ -331,7 +331,7 @@ class JsonProtocolSuite extends SparkFunSuite {
// Prior to Spark 1.3.0, SparkListenerJobStart did not have a "Submission Time" property.
// Also, SparkListenerJobEnd did not have a "Completion Time" property.
val stageIds = Seq[Int](1, 2, 3, 4)
- val stageInfos = stageIds.map(x => makeStageInfo(x * 10, x * 20, x * 30, x * 40, x * 50))
+ val stageInfos = stageIds.map(x => makeStageInfo(x * 10, x * 20, x * 30, x * 40L, x * 50L))
val jobStart = SparkListenerJobStart(11, jobSubmissionTime, stageInfos, properties)
val oldStartEvent = JsonProtocol.jobStartToJson(jobStart)
.removeField({ _._1 == "Submission Time"})
http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
index 2d94b66..9a89e62 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
@@ -40,7 +40,7 @@ object HashBenchmark {
safeProjection(encoder.toRow(generator().asInstanceOf[Row])).copy()
).toArray
- val benchmark = new Benchmark("Hash For " + name, iters * numRows)
+ val benchmark = new Benchmark("Hash For " + name, iters * numRows.toLong)
benchmark.addCase("interpreted version") { _: Int =>
var sum = 0
for (_ <- 0L until iters) {
http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
index 2a753a0..f6c8111 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
@@ -36,7 +36,8 @@ object HashByteArrayBenchmark {
bytes
}
- val benchmark = new Benchmark("Hash byte arrays with length " + length, iters * numArrays)
+ val benchmark =
+ new Benchmark("Hash byte arrays with length " + length, iters * numArrays.toLong)
benchmark.addCase("Murmur3_x86_32") { _: Int =>
var sum = 0L
for (_ <- 0L until iters) {
http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala
index 769addf..6c63769 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala
@@ -38,7 +38,7 @@ object UnsafeProjectionBenchmark {
val iters = 1024 * 16
val numRows = 1024 * 16
- val benchmark = new Benchmark("unsafe projection", iters * numRows)
+ val benchmark = new Benchmark("unsafe projection", iters * numRows.toLong)
val schema1 = new StructType().add("l", LongType, false)
http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
index 9005ec9..619b76f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
@@ -77,7 +77,7 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
count: Int,
tpe: NativeColumnType[T],
input: ByteBuffer): Unit = {
- val benchmark = new Benchmark(name, iters * count)
+ val benchmark = new Benchmark(name, iters * count.toLong)
schemes.filter(_.supports(tpe)).foreach { scheme =>
val (compressFunc, compressionRatio, buf) = prepareEncodeInternal(count, tpe, scheme, input)
@@ -101,7 +101,7 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
count: Int,
tpe: NativeColumnType[T],
input: ByteBuffer): Unit = {
- val benchmark = new Benchmark(name, iters * count)
+ val benchmark = new Benchmark(name, iters * count.toLong)
schemes.filter(_.supports(tpe)).foreach { scheme =>
val (compressFunc, _, buf) = prepareEncodeInternal(count, tpe, scheme, input)
http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
index 1f31aa4..8aeb06d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
@@ -295,7 +295,7 @@ object ColumnarBatchBenchmark {
def booleanAccess(iters: Int): Unit = {
val count = 8 * 1024
- val benchmark = new Benchmark("Boolean Read/Write", iters * count)
+ val benchmark = new Benchmark("Boolean Read/Write", iters * count.toLong)
benchmark.addCase("Bitset") { i: Int => {
val b = new BitSet(count)
var sum = 0L
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org