You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ko...@apache.org on 2018/03/16 17:26:10 UTC

spark git commit: [SPARK-18371][STREAMING] Spark Streaming backpressure generates batch with large number of records

Repository: spark
Updated Branches:
  refs/heads/master 5414abca4 -> dffeac369


[SPARK-18371][STREAMING] Spark Streaming backpressure generates batch with large number of records

## What changes were proposed in this pull request?

Omit rounding of backpressure rate. Effects:
- no batch with large number of records is created when rate from PID estimator is one
- the number of records per batch and partition is more fine-grained improving backpressure accuracy

## How was this patch tested?

This was tested by running:
- `mvn test -pl external/kafka-0-8`
- `mvn test -pl external/kafka-0-10`
- a streaming application which was suffering from the issue

JasonMWhite

The contribution is my original work and I license the work to the project under the project’s open source license

Author: Sebastian Arzt <se...@plista.com>

Closes #17774 from arzt/kafka-back-pressure.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dffeac36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dffeac36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dffeac36

Branch: refs/heads/master
Commit: dffeac3691daa620446ae949c5b147518d128e08
Parents: 5414abc
Author: Sebastian Arzt <se...@plista.com>
Authored: Fri Mar 16 12:25:58 2018 -0500
Committer: cody koeninger <co...@koeninger.org>
Committed: Fri Mar 16 12:25:58 2018 -0500

----------------------------------------------------------------------
 .../kafka010/DirectKafkaInputDStream.scala      |  6 +--
 .../kafka010/DirectKafkaStreamSuite.scala       | 48 ++++++++++++++++++
 .../kafka/DirectKafkaInputDStream.scala         |  6 +--
 .../kafka/DirectKafkaStreamSuite.scala          | 51 ++++++++++++++++++++
 4 files changed, 105 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dffeac36/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 0fa3287..9cb2448 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -138,17 +138,17 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
         lagPerPartition.map { case (tp, lag) =>
           val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp)
-          val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
+          val backpressureRate = lag / totalLag.toDouble * rate
           tp -> (if (maxRateLimitPerPartition > 0) {
             Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
         }
-      case None => offsets.map { case (tp, offset) => tp -> ppc.maxRatePerPartition(tp) }
+      case None => offsets.map { case (tp, offset) => tp -> ppc.maxRatePerPartition(tp).toDouble }
     }
 
     if (effectiveRateLimitPerPartition.values.sum > 0) {
       val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
       Some(effectiveRateLimitPerPartition.map {
-        case (tp, limit) => tp -> (secsPerBatch * limit).toLong
+        case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 1L)
       })
     } else {
       None

http://git-wip-us.apache.org/repos/asf/spark/blob/dffeac36/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 453b5e5..8524743 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -617,6 +617,54 @@ class DirectKafkaStreamSuite
     ssc.stop()
   }
 
+  test("maxMessagesPerPartition with zero offset and rate equal to one") {
+    val topic = "backpressure"
+    val kafkaParams = getKafkaParams()
+    val batchIntervalMilliseconds = 60000
+    val sparkConf = new SparkConf()
+      // Safe, even with streaming, because we're using the direct API.
+      // Using 1 core is useful to make the test more predictable.
+      .setMaster("local[1]")
+      .setAppName(this.getClass.getSimpleName)
+      .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+    // Setup the streaming context
+    ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
+    val estimateRate = 1L
+    val fromOffsets = Map(
+      new TopicPartition(topic, 0) -> 0L,
+      new TopicPartition(topic, 1) -> 0L,
+      new TopicPartition(topic, 2) -> 0L,
+      new TopicPartition(topic, 3) -> 0L
+    )
+    val kafkaStream = withClue("Error creating direct stream") {
+      new DirectKafkaInputDStream[String, String](
+        ssc,
+        preferredHosts,
+        ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala),
+        new DefaultPerPartitionConfig(sparkConf)
+      ) {
+        currentOffsets = fromOffsets
+        override val rateController = Some(new ConstantRateController(id, null, estimateRate))
+      }
+    }
+
+    val offsets = Map[TopicPartition, Long](
+      new TopicPartition(topic, 0) -> 0,
+      new TopicPartition(topic, 1) -> 100L,
+      new TopicPartition(topic, 2) -> 200L,
+      new TopicPartition(topic, 3) -> 300L
+    )
+    val result = kafkaStream.maxMessagesPerPartition(offsets)
+    val expected = Map(
+      new TopicPartition(topic, 0) -> 1L,
+      new TopicPartition(topic, 1) -> 10L,
+      new TopicPartition(topic, 2) -> 20L,
+      new TopicPartition(topic, 3) -> 30L
+    )
+    assert(result.contains(expected), s"Number of messages per partition must be at least 1")
+  }
+
   /** Get the generated offset ranges from the DirectKafkaStream */
   private def getOffsetRanges[K, V](
       kafkaStream: DStream[ConsumerRecord[K, V]]): Seq[(Time, Array[OffsetRange])] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/dffeac36/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index d52c230..d6dd074 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -104,17 +104,17 @@ class DirectKafkaInputDStream[
         val totalLag = lagPerPartition.values.sum
 
         lagPerPartition.map { case (tp, lag) =>
-          val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
+          val backpressureRate = lag / totalLag.toDouble * rate
           tp -> (if (maxRateLimitPerPartition > 0) {
             Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
         }
-      case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
+      case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition.toDouble }
     }
 
     if (effectiveRateLimitPerPartition.values.sum > 0) {
       val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
       Some(effectiveRateLimitPerPartition.map {
-        case (tp, limit) => tp -> (secsPerBatch * limit).toLong
+        case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 1L)
       })
     } else {
       None

http://git-wip-us.apache.org/repos/asf/spark/blob/dffeac36/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 06ef5bc..3fea6cf 100644
--- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -456,6 +456,57 @@ class DirectKafkaStreamSuite
     ssc.stop()
   }
 
+  test("maxMessagesPerPartition with zero offset and rate equal to one") {
+    val topic = "backpressure"
+    val kafkaParams = Map(
+      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+      "auto.offset.reset" -> "smallest"
+    )
+
+    val batchIntervalMilliseconds = 60000
+    val sparkConf = new SparkConf()
+      // Safe, even with streaming, because we're using the direct API.
+      // Using 1 core is useful to make the test more predictable.
+      .setMaster("local[1]")
+      .setAppName(this.getClass.getSimpleName)
+      .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+    // Setup the streaming context
+    ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
+    val estimatedRate = 1L
+    val kafkaStream = withClue("Error creating direct stream") {
+      val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
+      val fromOffsets = Map(
+        TopicAndPartition(topic, 0) -> 0L,
+        TopicAndPartition(topic, 1) -> 0L,
+        TopicAndPartition(topic, 2) -> 0L,
+        TopicAndPartition(topic, 3) -> 0L
+      )
+      new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
+        ssc, kafkaParams, fromOffsets, messageHandler) {
+        override protected[streaming] val rateController =
+          Some(new DirectKafkaRateController(id, null) {
+            override def getLatestRate() = estimatedRate
+          })
+      }
+    }
+
+    val offsets = Map(
+      TopicAndPartition(topic, 0) -> 0L,
+      TopicAndPartition(topic, 1) -> 100L,
+      TopicAndPartition(topic, 2) -> 200L,
+      TopicAndPartition(topic, 3) -> 300L
+    )
+    val result = kafkaStream.maxMessagesPerPartition(offsets)
+    val expected = Map(
+      TopicAndPartition(topic, 0) -> 1L,
+      TopicAndPartition(topic, 1) -> 10L,
+      TopicAndPartition(topic, 2) -> 20L,
+      TopicAndPartition(topic, 3) -> 30L
+    )
+    assert(result.contains(expected), s"Number of messages per partition must be at least 1")
+  }
+
   /** Get the generated offset ranges from the DirectKafkaStream */
   private def getOffsetRanges[K, V](
       kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org