You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/01/29 18:58:31 UTC
[spark] branch master updated: [SPARK-26718][SS] Fixed integer
overflow in SS kafka rateLimit calculation
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fbc3c5e [SPARK-26718][SS] Fixed integer overflow in SS kafka rateLimit calculation
fbc3c5e is described below
commit fbc3c5e8a33c28e46d839a2d1db81d9a89b29327
Author: ryne.yang <ry...@acuityads.com>
AuthorDate: Tue Jan 29 10:58:10 2019 -0800
[SPARK-26718][SS] Fixed integer overflow in SS kafka rateLimit calculation
## What changes were proposed in this pull request?
Fix the integer overflow issue in rateLimit.
## How was this patch tested?
Pass the Jenkins with newly added UT for the possible case where integer could be overflowed.
Closes #23666 from linehrr/master.
Authored-by: ryne.yang <ry...@acuityads.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../spark/sql/kafka010/KafkaMicroBatchStream.scala | 10 ++++++-
.../apache/spark/sql/kafka010/KafkaSource.scala | 10 ++++++-
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 35 ++++++++++++++++++++++
3 files changed, 53 insertions(+), 2 deletions(-)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 3ae9bd3..337a51e 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -231,7 +231,15 @@ private[kafka010] class KafkaMicroBatchStream(
val begin = from.get(tp).getOrElse(fromNew(tp))
val prorate = limit * (size / total)
// Don't completely starve small topicpartitions
- val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+ val prorateLong = (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+ // need to be careful of integer overflow
+ // therefore added canary checks where to see if off variable could be overflowed
+ // refer to [https://issues.apache.org/jira/browse/SPARK-26718]
+ val off = if (prorateLong > Long.MaxValue - begin) {
+ Long.MaxValue
+ } else {
+ begin + prorateLong
+ }
// Paranoia, make sure not to return an offset that's past end
Math.min(end, off)
}.getOrElse(end)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index da55334..624c796 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -191,7 +191,15 @@ private[kafka010] class KafkaSource(
val prorate = limit * (size / total)
logDebug(s"rateLimit $tp prorated amount is $prorate")
// Don't completely starve small topicpartitions
- val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+ val prorateLong = (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+ // need to be careful of integer overflow
+ // therefore added canary checks where to see if off variable could be overflowed
+ // refer to [https://issues.apache.org/jira/browse/SPARK-26718]
+ val off = if (prorateLong > Long.MaxValue - begin) {
+ Long.MaxValue
+ } else {
+ begin + prorateLong
+ }
logDebug(s"rateLimit $tp new offset is $off")
// Paranoia, make sure not to return an offset that's past end
Math.min(end, off)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index aa7baac..8fd5790 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -196,6 +196,41 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
StopStream)
}
+ test("SPARK-26718 Rate limit set to Long.Max should not overflow integer " +
+ "during end offset calculation") {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+ // fill in 5 messages to trigger potential integer overflow
+ testUtils.sendMessages(topic, (0 until 5).map(_.toString).toArray, Some(0))
+
+ val partitionOffsets = Map(
+ new TopicPartition(topic, 0) -> 5L
+ )
+ val startingOffsets = JsonUtils.partitionOffsets(partitionOffsets)
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ // use latest to force begin to be 5
+ .option("startingOffsets", startingOffsets)
+ // use Long.Max to try to trigger overflow
+ .option("maxOffsetsPerTrigger", Long.MaxValue)
+ .option("subscribe", topic)
+ .option("kafka.metadata.max.age.ms", "1")
+ .load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ testStream(mapped)(
+ makeSureGetOffsetCalled,
+ AddKafkaData(Set(topic), 30, 31, 32, 33, 34),
+ CheckAnswer(30, 31, 32, 33, 34),
+ StopStream
+ )
+ }
+
test("maxOffsetsPerTrigger") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org