You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ko...@apache.org on 2018/03/16 17:26:10 UTC
spark git commit: [SPARK-18371][STREAMING] Spark Streaming
backpressure generates batch with large number of records
Repository: spark
Updated Branches:
refs/heads/master 5414abca4 -> dffeac369
[SPARK-18371][STREAMING] Spark Streaming backpressure generates batch with large number of records
## What changes were proposed in this pull request?
Omit rounding of backpressure rate. Effects:
- no batch with large number of records is created when rate from PID estimator is one
- the number of records per batch and partition is more fine-grained improving backpressure accuracy
## How was this patch tested?
This was tested by running:
- `mvn test -pl external/kafka-0-8`
- `mvn test -pl external/kafka-0-10`
- a streaming application which was suffering from the issue
JasonMWhite
The contribution is my original work and I license the work to the project under the project’s open source license
Author: Sebastian Arzt <se...@plista.com>
Closes #17774 from arzt/kafka-back-pressure.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dffeac36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dffeac36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dffeac36
Branch: refs/heads/master
Commit: dffeac3691daa620446ae949c5b147518d128e08
Parents: 5414abc
Author: Sebastian Arzt <se...@plista.com>
Authored: Fri Mar 16 12:25:58 2018 -0500
Committer: cody koeninger <co...@koeninger.org>
Committed: Fri Mar 16 12:25:58 2018 -0500
----------------------------------------------------------------------
.../kafka010/DirectKafkaInputDStream.scala | 6 +--
.../kafka010/DirectKafkaStreamSuite.scala | 48 ++++++++++++++++++
.../kafka/DirectKafkaInputDStream.scala | 6 +--
.../kafka/DirectKafkaStreamSuite.scala | 51 ++++++++++++++++++++
4 files changed, 105 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/dffeac36/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 0fa3287..9cb2448 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -138,17 +138,17 @@ private[spark] class DirectKafkaInputDStream[K, V](
lagPerPartition.map { case (tp, lag) =>
val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp)
- val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
+ val backpressureRate = lag / totalLag.toDouble * rate
tp -> (if (maxRateLimitPerPartition > 0) {
Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
}
- case None => offsets.map { case (tp, offset) => tp -> ppc.maxRatePerPartition(tp) }
+ case None => offsets.map { case (tp, offset) => tp -> ppc.maxRatePerPartition(tp).toDouble }
}
if (effectiveRateLimitPerPartition.values.sum > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some(effectiveRateLimitPerPartition.map {
- case (tp, limit) => tp -> (secsPerBatch * limit).toLong
+ case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 1L)
})
} else {
None
http://git-wip-us.apache.org/repos/asf/spark/blob/dffeac36/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 453b5e5..8524743 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -617,6 +617,54 @@ class DirectKafkaStreamSuite
ssc.stop()
}
+ test("maxMessagesPerPartition with zero offset and rate equal to one") {
+ val topic = "backpressure"
+ val kafkaParams = getKafkaParams()
+ val batchIntervalMilliseconds = 60000
+ val sparkConf = new SparkConf()
+ // Safe, even with streaming, because we're using the direct API.
+ // Using 1 core is useful to make the test more predictable.
+ .setMaster("local[1]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+ // Setup the streaming context
+ ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
+ val estimateRate = 1L
+ val fromOffsets = Map(
+ new TopicPartition(topic, 0) -> 0L,
+ new TopicPartition(topic, 1) -> 0L,
+ new TopicPartition(topic, 2) -> 0L,
+ new TopicPartition(topic, 3) -> 0L
+ )
+ val kafkaStream = withClue("Error creating direct stream") {
+ new DirectKafkaInputDStream[String, String](
+ ssc,
+ preferredHosts,
+ ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala),
+ new DefaultPerPartitionConfig(sparkConf)
+ ) {
+ currentOffsets = fromOffsets
+ override val rateController = Some(new ConstantRateController(id, null, estimateRate))
+ }
+ }
+
+ val offsets = Map[TopicPartition, Long](
+ new TopicPartition(topic, 0) -> 0,
+ new TopicPartition(topic, 1) -> 100L,
+ new TopicPartition(topic, 2) -> 200L,
+ new TopicPartition(topic, 3) -> 300L
+ )
+ val result = kafkaStream.maxMessagesPerPartition(offsets)
+ val expected = Map(
+ new TopicPartition(topic, 0) -> 1L,
+ new TopicPartition(topic, 1) -> 10L,
+ new TopicPartition(topic, 2) -> 20L,
+ new TopicPartition(topic, 3) -> 30L
+ )
+ assert(result.contains(expected), s"Number of messages per partition must be at least 1")
+ }
+
/** Get the generated offset ranges from the DirectKafkaStream */
private def getOffsetRanges[K, V](
kafkaStream: DStream[ConsumerRecord[K, V]]): Seq[(Time, Array[OffsetRange])] = {
http://git-wip-us.apache.org/repos/asf/spark/blob/dffeac36/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index d52c230..d6dd074 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -104,17 +104,17 @@ class DirectKafkaInputDStream[
val totalLag = lagPerPartition.values.sum
lagPerPartition.map { case (tp, lag) =>
- val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
+ val backpressureRate = lag / totalLag.toDouble * rate
tp -> (if (maxRateLimitPerPartition > 0) {
Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
}
- case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
+ case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition.toDouble }
}
if (effectiveRateLimitPerPartition.values.sum > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some(effectiveRateLimitPerPartition.map {
- case (tp, limit) => tp -> (secsPerBatch * limit).toLong
+ case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 1L)
})
} else {
None
http://git-wip-us.apache.org/repos/asf/spark/blob/dffeac36/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 06ef5bc..3fea6cf 100644
--- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -456,6 +456,57 @@ class DirectKafkaStreamSuite
ssc.stop()
}
+ test("maxMessagesPerPartition with zero offset and rate equal to one") {
+ val topic = "backpressure"
+ val kafkaParams = Map(
+ "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+ "auto.offset.reset" -> "smallest"
+ )
+
+ val batchIntervalMilliseconds = 60000
+ val sparkConf = new SparkConf()
+ // Safe, even with streaming, because we're using the direct API.
+ // Using 1 core is useful to make the test more predictable.
+ .setMaster("local[1]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+ // Setup the streaming context
+ ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
+ val estimatedRate = 1L
+ val kafkaStream = withClue("Error creating direct stream") {
+ val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
+ val fromOffsets = Map(
+ TopicAndPartition(topic, 0) -> 0L,
+ TopicAndPartition(topic, 1) -> 0L,
+ TopicAndPartition(topic, 2) -> 0L,
+ TopicAndPartition(topic, 3) -> 0L
+ )
+ new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
+ ssc, kafkaParams, fromOffsets, messageHandler) {
+ override protected[streaming] val rateController =
+ Some(new DirectKafkaRateController(id, null) {
+ override def getLatestRate() = estimatedRate
+ })
+ }
+ }
+
+ val offsets = Map(
+ TopicAndPartition(topic, 0) -> 0L,
+ TopicAndPartition(topic, 1) -> 100L,
+ TopicAndPartition(topic, 2) -> 200L,
+ TopicAndPartition(topic, 3) -> 300L
+ )
+ val result = kafkaStream.maxMessagesPerPartition(offsets)
+ val expected = Map(
+ TopicAndPartition(topic, 0) -> 1L,
+ TopicAndPartition(topic, 1) -> 10L,
+ TopicAndPartition(topic, 2) -> 20L,
+ TopicAndPartition(topic, 3) -> 30L
+ )
+ assert(result.contains(expected), s"Number of messages per partition must be at least 1")
+ }
+
/** Get the generated offset ranges from the DirectKafkaStream */
private def getOffsetRanges[K, V](
kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org