You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/04/08 18:40:32 UTC

spark git commit: [SPARK-23893][CORE][SQL] Avoid possible integer overflow in multiplication

Repository: spark
Updated Branches:
  refs/heads/master 710a68cec -> 8d40a79a0


[SPARK-23893][CORE][SQL] Avoid possible integer overflow in multiplication

## What changes were proposed in this pull request?

This PR avoids possible overflow at an operation `long = (long)(int * int)`. The multiplication of large positive integer values may set one to MSB. This leads to a negative value in long while we expected a positive value (e.g. `0111_0000_0000_0000 * 0000_0000_0000_0010`).

This PR performs long cast before the multiplication to avoid this situation.

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <is...@jp.ibm.com>

Closes #21002 from kiszk/SPARK-23893.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d40a79a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d40a79a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d40a79a

Branch: refs/heads/master
Commit: 8d40a79a077a30024a8ef921781b68f6f7e542d1
Parents: 710a68c
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Authored: Sun Apr 8 20:40:27 2018 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Sun Apr 8 20:40:27 2018 +0200

----------------------------------------------------------------------
 .../spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java  | 2 +-
 .../spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java  | 2 +-
 core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala   | 2 +-
 .../test/scala/org/apache/spark/InternalAccumulatorSuite.scala   | 2 +-
 .../org/apache/spark/deploy/history/FsHistoryProviderSuite.scala | 4 ++--
 .../src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 4 ++--
 .../src/test/scala/org/apache/spark/sql/HashBenchmark.scala      | 2 +-
 .../test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala | 3 ++-
 .../scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala   | 2 +-
 .../columnar/compression/CompressionSchemeBenchmark.scala        | 4 ++--
 .../spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala  | 2 +-
 11 files changed, 15 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 20a7a8b..717823e 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -124,7 +124,7 @@ public final class UnsafeInMemorySorter {
     int initialSize,
     boolean canUseRadixSort) {
     this(consumer, memoryManager, recordComparator, prefixComparator,
-      consumer.allocateArray(initialSize * 2), canUseRadixSort);
+      consumer.allocateArray(initialSize * 2L), canUseRadixSort);
   }
 
   public UnsafeInMemorySorter(

http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
index d9f84d1..37772f4 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
@@ -84,7 +84,7 @@ public final class UnsafeSortDataFormat
 
   @Override
   public LongArray allocate(int length) {
-    assert (length * 2 <= buffer.size()) :
+    assert (length * 2L <= buffer.size()) :
       "the buffer is smaller than required: " + buffer.size() + " < " + (length * 2);
     return buffer;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index c9ed12f..13db498 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -90,7 +90,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
           // Otherwise, interpolate the number of partitions we need to try, but overestimate it
           // by 50%. We also cap the estimation in the end.
           if (results.size == 0) {
-            numPartsToTry = partsScanned * 4
+            numPartsToTry = partsScanned * 4L
           } else {
             // the left side of max is >=1 whenever partsScanned >= 2
             numPartsToTry = Math.max(1,

http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index 8d7be77..62824a5 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -135,7 +135,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
       // This job runs 2 stages, and we're in the second stage. Therefore, any task attempt
       // ID that's < 2 * numPartitions belongs to the first attempt of this stage.
       val taskContext = TaskContext.get()
-      val isFirstStageAttempt = taskContext.taskAttemptId() < numPartitions * 2
+      val isFirstStageAttempt = taskContext.taskAttemptId() < numPartitions * 2L
       if (isFirstStageAttempt) {
         throw new FetchFailedException(
           SparkEnv.get.blockManager.blockManagerId,

http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index fde5f25..0ba57bf 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -382,8 +382,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
       val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false)
       writeFile(log, true, None,
         SparkListenerApplicationStart(
-          "downloadApp1", Some("downloadApp1"), 5000 * i, "test", Some(s"attempt$i")),
-        SparkListenerApplicationEnd(5001 * i)
+          "downloadApp1", Some("downloadApp1"), 5000L * i, "test", Some(s"attempt$i")),
+        SparkListenerApplicationEnd(5001L * i)
       )
       log
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 4abbb8e..74b72d9 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -317,7 +317,7 @@ class JsonProtocolSuite extends SparkFunSuite {
   test("SparkListenerJobStart backward compatibility") {
     // Prior to Spark 1.2.0, SparkListenerJobStart did not have a "Stage Infos" property.
     val stageIds = Seq[Int](1, 2, 3, 4)
-    val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400, x * 500))
+    val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400L, x * 500L))
     val dummyStageInfos =
       stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown"))
     val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
@@ -331,7 +331,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     // Prior to Spark 1.3.0, SparkListenerJobStart did not have a "Submission Time" property.
     // Also, SparkListenerJobEnd did not have a "Completion Time" property.
     val stageIds = Seq[Int](1, 2, 3, 4)
-    val stageInfos = stageIds.map(x => makeStageInfo(x * 10, x * 20, x * 30, x * 40, x * 50))
+    val stageInfos = stageIds.map(x => makeStageInfo(x * 10, x * 20, x * 30, x * 40L, x * 50L))
     val jobStart = SparkListenerJobStart(11, jobSubmissionTime, stageInfos, properties)
     val oldStartEvent = JsonProtocol.jobStartToJson(jobStart)
       .removeField({ _._1 == "Submission Time"})

http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
index 2d94b66..9a89e62 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala
@@ -40,7 +40,7 @@ object HashBenchmark {
       safeProjection(encoder.toRow(generator().asInstanceOf[Row])).copy()
     ).toArray
 
-    val benchmark = new Benchmark("Hash For " + name, iters * numRows)
+    val benchmark = new Benchmark("Hash For " + name, iters * numRows.toLong)
     benchmark.addCase("interpreted version") { _: Int =>
       var sum = 0
       for (_ <- 0L until iters) {

http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
index 2a753a0..f6c8111 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala
@@ -36,7 +36,8 @@ object HashByteArrayBenchmark {
       bytes
     }
 
-    val benchmark = new Benchmark("Hash byte arrays with length " + length, iters * numArrays)
+    val benchmark =
+      new Benchmark("Hash byte arrays with length " + length, iters * numArrays.toLong)
     benchmark.addCase("Murmur3_x86_32") { _: Int =>
       var sum = 0L
       for (_ <- 0L until iters) {

http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala
index 769addf..6c63769 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala
@@ -38,7 +38,7 @@ object UnsafeProjectionBenchmark {
     val iters = 1024 * 16
     val numRows = 1024 * 16
 
-    val benchmark = new Benchmark("unsafe projection", iters * numRows)
+    val benchmark = new Benchmark("unsafe projection", iters * numRows.toLong)
 
 
     val schema1 = new StructType().add("l", LongType, false)

http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
index 9005ec9..619b76f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
@@ -77,7 +77,7 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
       count: Int,
       tpe: NativeColumnType[T],
       input: ByteBuffer): Unit = {
-    val benchmark = new Benchmark(name, iters * count)
+    val benchmark = new Benchmark(name, iters * count.toLong)
 
     schemes.filter(_.supports(tpe)).foreach { scheme =>
       val (compressFunc, compressionRatio, buf) = prepareEncodeInternal(count, tpe, scheme, input)
@@ -101,7 +101,7 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
       count: Int,
       tpe: NativeColumnType[T],
       input: ByteBuffer): Unit = {
-    val benchmark = new Benchmark(name, iters * count)
+    val benchmark = new Benchmark(name, iters * count.toLong)
 
     schemes.filter(_.supports(tpe)).foreach { scheme =>
       val (compressFunc, _, buf) = prepareEncodeInternal(count, tpe, scheme, input)

http://git-wip-us.apache.org/repos/asf/spark/blob/8d40a79a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
index 1f31aa4..8aeb06d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
@@ -295,7 +295,7 @@ object ColumnarBatchBenchmark {
 
   def booleanAccess(iters: Int): Unit = {
     val count = 8 * 1024
-    val benchmark = new Benchmark("Boolean Read/Write", iters * count)
+    val benchmark = new Benchmark("Boolean Read/Write", iters * count.toLong)
     benchmark.addCase("Bitset") { i: Int => {
       val b = new BitSet(count)
       var sum = 0L


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org