You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/01/29 18:58:31 UTC

[spark] branch master updated: [SPARK-26718][SS] Fixed integer overflow in SS kafka rateLimit calculation

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fbc3c5e  [SPARK-26718][SS] Fixed integer overflow in SS kafka rateLimit calculation
fbc3c5e is described below

commit fbc3c5e8a33c28e46d839a2d1db81d9a89b29327
Author: ryne.yang <ry...@acuityads.com>
AuthorDate: Tue Jan 29 10:58:10 2019 -0800

    [SPARK-26718][SS] Fixed integer overflow in SS kafka rateLimit calculation
    
    ## What changes were proposed in this pull request?
    
    Fix the integer overflow issue in rateLimit.
    
    ## How was this patch tested?
    
    Pass the Jenkins with newly added UT for the possible case where integer could be overflowed.
    
    Closes #23666 from linehrr/master.
    
    Authored-by: ryne.yang <ry...@acuityads.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 10 ++++++-
 .../apache/spark/sql/kafka010/KafkaSource.scala    | 10 ++++++-
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 35 ++++++++++++++++++++++
 3 files changed, 53 insertions(+), 2 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 3ae9bd3..337a51e 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -231,7 +231,15 @@ private[kafka010] class KafkaMicroBatchStream(
             val begin = from.get(tp).getOrElse(fromNew(tp))
             val prorate = limit * (size / total)
             // Don't completely starve small topicpartitions
-            val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+            val prorateLong = (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+            // need to be careful of integer overflow
+            // therefore added canary checks where to see if off variable could be overflowed
+            // refer to [https://issues.apache.org/jira/browse/SPARK-26718]
+            val off = if (prorateLong > Long.MaxValue - begin) {
+              Long.MaxValue
+            } else {
+              begin + prorateLong
+            }
             // Paranoia, make sure not to return an offset that's past end
             Math.min(end, off)
           }.getOrElse(end)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index da55334..624c796 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -191,7 +191,15 @@ private[kafka010] class KafkaSource(
             val prorate = limit * (size / total)
             logDebug(s"rateLimit $tp prorated amount is $prorate")
             // Don't completely starve small topicpartitions
-            val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+            val prorateLong = (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+            // need to be careful of integer overflow
+            // therefore added canary checks where to see if off variable could be overflowed
+            // refer to [https://issues.apache.org/jira/browse/SPARK-26718]
+            val off = if (prorateLong > Long.MaxValue - begin) {
+              Long.MaxValue
+            } else {
+              begin + prorateLong
+            }
             logDebug(s"rateLimit $tp new offset is $off")
             // Paranoia, make sure not to return an offset that's past end
             Math.min(end, off)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index aa7baac..8fd5790 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -196,6 +196,41 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
       StopStream)
   }
 
+  test("SPARK-26718 Rate limit set to Long.Max should not overflow integer " +
+    "during end offset calculation") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 1)
+    // fill in 5 messages to trigger potential integer overflow
+    testUtils.sendMessages(topic, (0 until 5).map(_.toString).toArray, Some(0))
+
+    val partitionOffsets = Map(
+      new TopicPartition(topic, 0) -> 5L
+    )
+    val startingOffsets = JsonUtils.partitionOffsets(partitionOffsets)
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      // use latest to force begin to be 5
+      .option("startingOffsets", startingOffsets)
+      // use Long.Max to try to trigger overflow
+      .option("maxOffsetsPerTrigger", Long.MaxValue)
+      .option("subscribe", topic)
+      .option("kafka.metadata.max.age.ms", "1")
+      .load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      AddKafkaData(Set(topic), 30, 31, 32, 33, 34),
+      CheckAnswer(30, 31, 32, 33, 34),
+      StopStream
+    )
+  }
+
   test("maxOffsetsPerTrigger") {
     val topic = newTopic()
     testUtils.createTopic(topic, partitions = 3)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org