You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by akonopko <gi...@git.apache.org> on 2017/10/04 18:18:26 UTC

[GitHub] spark pull request #19431: Add spark.streaming.backpressure.initialRate to d...

GitHub user akonopko opened a pull request:

    https://github.com/apache/spark/pull/19431

    Add spark.streaming.backpressure.initialRate to direct Kafka streams

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/akonopko/spark SPARK-18580-initialrate

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19431.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19431
    
----

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r167281427
  
    --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala ---
    @@ -21,6 +21,7 @@ import java.io.File
     import java.util.Arrays
     import java.util.concurrent.ConcurrentLinkedQueue
     import java.util.concurrent.atomic.AtomicLong
    +import java.util.UUID
    --- End diff --
    
    [error] /Users/gaborsomogyi/spark_review/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala:24:0: java.util.UUID is in wrong order relative to java.util.concurrent.atomic.AtomicLong.
    
    Please execute Scalastyle checks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r167281937
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala ---
    @@ -687,6 +618,51 @@ class DirectKafkaStreamSuite
         ssc.stop()
       }
     
    +  test("backpressure.initialRate should honor maxRatePerPartition") {
    +    backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250)
    +  }
    +
    +  test("use backpressure.initialRate with backpressure") {
    +    backpressureTest(maxRatePerPartition = 300, initialRate = 1000, maxMessagesPerPartition = 150)
    +  }
    +
    +  private def backpressureTest(maxRatePerPartition: Int,
    +                               initialRate: Int,
    --- End diff --
    
    Indentation is wrong. Please check Code Style Guide.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r166605547
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ---
    @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
     
       protected[streaming] def maxMessagesPerPartition(
         offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
    -    val estimatedRateLimit = rateController.map(_.getLatestRate())
    +    val estimatedRateLimit = rateController.map(x => {
    +      val lr = x.getLatestRate()
    +      if (lr > 0) lr else initialRate
    --- End diff --
    
    Latest rate means rate of previous batch. Is it possible that in alive system 0 events were processed? Only if there is no backlog and no new events came during last batch. Completely possible. 
    
    This happens during first ran. And this parameter should limit it during 1st ran. Quote from docs:
    
    `This is the initial maximum receiving rate at which each receiver will receive data for the
        first batch when the backpressure mechanism is enabled.`
    
    If it happened during system run, for example there is no backlog and no new events came, we still need to limit system rate since with LatestRate = 0 it results in no limit, causing danger of overflowing the system. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DSTREAM][KAFKA] Add spark.streamin...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19431


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    @gaborgsomogyi 
    `spark.streaming.backpressure.initialRate` is already documented in here: https://spark.apache.org/docs/latest/configuration.html
    But was mistakenly not included to to direct Kafka Streams


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r167242329
  
    --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala ---
    @@ -387,6 +387,89 @@ class DirectKafkaStreamSuite
           Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L))
       }
     
    +  test("use backpressure.initialRate with backpressure") {
    +    val topic = "backpressureInitialRate"
    +    val topicPartitions = Set(TopicAndPartition(topic, 0))
    +    kafkaTestUtils.createTopic(topic, 1)
    +    val kafkaParams = Map(
    +      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
    +      "auto.offset.reset" -> "smallest"
    +    )
    +
    +    val sparkConf = new SparkConf()
    +      // Safe, even with streaming, because we're using the direct API.
    +      // Using 1 core is useful to make the test more predictable.
    +      .setMaster("local[1]")
    +      .setAppName(this.getClass.getSimpleName)
    +      .set("spark.streaming.backpressure.enabled", "true")
    +      .set("spark.streaming.backpressure.initialRate", "500")
    +
    +    val messages = Map("foo" -> 5000)
    +    kafkaTestUtils.sendMessages(topic, messages)
    +
    +    ssc = new StreamingContext(sparkConf, Milliseconds(500))
    +
    +    val kafkaStream = withClue("Error creating direct stream") {
    +      val kc = new KafkaCluster(kafkaParams)
    +      val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
    +      val m = kc.getEarliestLeaderOffsets(topicPartitions)
    +        .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset))
    +
    +      new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
    +        ssc, kafkaParams, m, messageHandler)
    +    }
    +    kafkaStream.start()
    +
    +    val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
    +
    +    assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
    +      new TopicAndPartition(topic, 0) -> 250)) // we run for half a second
    +
    +    kafkaStream.stop()
    +  }
    +
    +  test("backpressure.initialRate should honor maxRatePerPartition") {
    +    val topic = "backpressureInitialRate2"
    +    val topicPartitions = Set(TopicAndPartition(topic, 0))
    +    kafkaTestUtils.createTopic(topic, 1)
    +    val kafkaParams = Map(
    +      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
    +      "auto.offset.reset" -> "smallest"
    +    )
    +
    +    val sparkConf = new SparkConf()
    +      // Safe, even with streaming, because we're using the direct API.
    +      // Using 1 core is useful to make the test more predictable.
    +      .setMaster("local[1]")
    +      .setAppName(this.getClass.getSimpleName)
    +      .set("spark.streaming.backpressure.enabled", "true")
    +      .set("spark.streaming.kafka.maxRatePerPartition", "300")
    +      .set("spark.streaming.backpressure.initialRate", "1000")
    +
    +    val messages = Map("foo" -> 5000)
    +    kafkaTestUtils.sendMessages(topic, messages)
    +
    +    ssc = new StreamingContext(sparkConf, Milliseconds(500))
    +
    +    val kafkaStream = withClue("Error creating direct stream") {
    +      val kc = new KafkaCluster(kafkaParams)
    +      val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
    +      val m = kc.getEarliestLeaderOffsets(topicPartitions)
    +        .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset))
    +
    +      new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
    +        ssc, kafkaParams, m, messageHandler)
    +    }
    +    kafkaStream.start()
    +
    +    val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
    +
    +    assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    @tdas any concerns?
    
    If @omuravskiy doesn't express any objections (since these tests are basically taken directly from his linked PR) in the next couple of days, I'm inclined to merge this.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r166603416
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ---
    @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
     
       protected[streaming] def maxMessagesPerPartition(
         offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
    -    val estimatedRateLimit = rateController.map(_.getLatestRate())
    +    val estimatedRateLimit = rateController.map(x => {
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r158375331
  
    --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala ---
    @@ -108,7 +115,9 @@ class DirectKafkaInputDStream[
               tp -> (if (maxRateLimitPerPartition > 0) {
                 Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
             }
    -      case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
    +      case None => offsets.map { case (tp, offset) => tp -> {
    --- End diff --
    
    What is the intention here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    @tdas could you take a look at it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r166953420
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala ---
    @@ -551,6 +551,76 @@ class DirectKafkaStreamSuite
           Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) -> 10L))
       }
     
    +  test("use backpressure.initialRate with backpressure") {
    +    val topic = "backpressureInitialRate"
    +    val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
    +    val sparkConf = new SparkConf()
    +      // Safe, even with streaming, because we're using the direct API.
    +      // Using 1 core is useful to make the test more predictable.
    +      .setMaster("local[1]")
    +      .setAppName(this.getClass.getSimpleName)
    +      .set("spark.streaming.backpressure.enabled", "true")
    +      .set("spark.streaming.kafka.maxRatePerPartition", "1000")
    +      .set("spark.streaming.backpressure.initialRate", "500")
    +
    +    val messages = Map("foo" -> 5000)
    +    kafkaTestUtils.sendMessages(topic, messages)
    +
    +    ssc = new StreamingContext(sparkConf, Milliseconds(500))
    +
    +    val kafkaStream = withClue("Error creating direct stream") {
    +      new DirectKafkaInputDStream[String, String](
    +        ssc,
    +        preferredHosts,
    +        ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala),
    +        new DefaultPerPartitionConfig(sparkConf)
    +      )
    +    }
    +    kafkaStream.start()
    +
    +    val input = Map(new TopicPartition(topic, 0) -> 1000L)
    +
    +    assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
    --- End diff --
    
    Just caught this could be a bit simplified:
    
    >     assert(kafkaStream.maxMessagesPerPartition(input).get ==
    >      Map(new TopicPartition(topic, 0) -> 250)) // we run for half a second



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    Jenkins, ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r166605578
  
    --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala ---
    @@ -91,9 +91,16 @@ class DirectKafkaInputDStream[
       private val maxRateLimitPerPartition: Long = context.sparkContext.getConf.getLong(
           "spark.streaming.kafka.maxRatePerPartition", 0)
     
    +  private val initialRate = context.sparkContext.getConf.getLong(
    +    "spark.streaming.backpressure.initialRate", 0)
    +
       protected[streaming] def maxMessagesPerPartition(
           offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = {
    -    val estimatedRateLimit = rateController.map(_.getLatestRate())
    +
    +    val estimatedRateLimit = rateController.map(x => {
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r166952263
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ---
    @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
     
       protected[streaming] def maxMessagesPerPartition(
         offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
    -    val estimatedRateLimit = rateController.map(_.getLatestRate())
    +    val estimatedRateLimit = rateController.map(x => {
    +      val lr = x.getLatestRate()
    +      if (lr > 0) lr else initialRate
    --- End diff --
    
    Thanks for the info. My concern was the `LatestRate = 0` case, where limit can be lost. In the meantime taken a look at the `PIDRateEstimator` which could not produce 0 rate because of this:
    
    `
            val newRate = (latestRate - proportional * error -
                                        integral * historicalError -
                                        derivative * dError).max(minRate)
    `
    
    and minRate is limited:
    
    `
      require(
        minRate > 0,
        s"Minimum rate in PIDRateEstimator should be > 0")
    `
    
    I'm fine with this.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r166953894
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala ---
    @@ -551,6 +551,76 @@ class DirectKafkaStreamSuite
           Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) -> 10L))
       }
     
    +  test("use backpressure.initialRate with backpressure") {
    +    val topic = "backpressureInitialRate"
    +    val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
    +    val sparkConf = new SparkConf()
    +      // Safe, even with streaming, because we're using the direct API.
    +      // Using 1 core is useful to make the test more predictable.
    +      .setMaster("local[1]")
    +      .setAppName(this.getClass.getSimpleName)
    +      .set("spark.streaming.backpressure.enabled", "true")
    +      .set("spark.streaming.kafka.maxRatePerPartition", "1000")
    +      .set("spark.streaming.backpressure.initialRate", "500")
    +
    +    val messages = Map("foo" -> 5000)
    +    kafkaTestUtils.sendMessages(topic, messages)
    +
    +    ssc = new StreamingContext(sparkConf, Milliseconds(500))
    +
    +    val kafkaStream = withClue("Error creating direct stream") {
    +      new DirectKafkaInputDStream[String, String](
    +        ssc,
    +        preferredHosts,
    +        ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala),
    +        new DefaultPerPartitionConfig(sparkConf)
    +      )
    +    }
    +    kafkaStream.start()
    +
    +    val input = Map(new TopicPartition(topic, 0) -> 1000L)
    +
    +    assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
    +      new TopicPartition(topic, 0) -> 250)) // we run for half a second
    +
    +    kafkaStream.stop()
    +  }
    +
    +  test("backpressure.initialRate should honor maxRatePerPartition") {
    +    val topic = "backpressureInitialRate"
    +    val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
    +    val sparkConf = new SparkConf()
    +      // Safe, even with streaming, because we're using the direct API.
    +      // Using 1 core is useful to make the test more predictable.
    +      .setMaster("local[1]")
    +      .setAppName(this.getClass.getSimpleName)
    +      .set("spark.streaming.backpressure.enabled", "true")
    +      .set("spark.streaming.kafka.maxRatePerPartition", "300")
    +      .set("spark.streaming.backpressure.initialRate", "1000")
    +
    +    val messages = Map("foo" -> 5000)
    +    kafkaTestUtils.sendMessages(topic, messages)
    +
    +    ssc = new StreamingContext(sparkConf, Milliseconds(500))
    +
    +    val kafkaStream = withClue("Error creating direct stream") {
    +      new DirectKafkaInputDStream[String, String](
    +        ssc,
    +        preferredHosts,
    +        ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala),
    +        new DefaultPerPartitionConfig(sparkConf)
    +      )
    +    }
    +    kafkaStream.start()
    +
    +    val input = Map(new TopicPartition(topic, 0) -> 1000L)
    +
    +    assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
    --- End diff --
    
    Same simplification here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r173719300
  
    --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala ---
    @@ -456,6 +455,60 @@ class DirectKafkaStreamSuite
         ssc.stop()
       }
     
    +  test("backpressure.initialRate should honor maxRatePerPartition") {
    +    backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250)
    +  }
    +
    +  test("use backpressure.initialRate with backpressure") {
    --- End diff --
    
    Right, thank you. I will correct this


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    **[Test build #88477 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88477/testReport)** for PR 19431 at commit [`d11e807`](https://github.com/apache/spark/commit/d11e8078672048693b3538db902a2827d14eeaf5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r166605671
  
    --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala ---
    @@ -108,7 +115,9 @@ class DirectKafkaInputDStream[
               tp -> (if (maxRateLimitPerPartition > 0) {
                 Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
             }
    -      case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
    +      case None => offsets.map { case (tp, offset) => tp -> {
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DSTREAM][KAFKA] Add spark.streaming.backp...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    Merged to master


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r167281176
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala ---
    @@ -22,6 +22,7 @@ import java.lang.{ Long => JLong }
     import java.util.{ Arrays, HashMap => JHashMap, Map => JMap }
     import java.util.concurrent.ConcurrentLinkedQueue
     import java.util.concurrent.atomic.AtomicLong
    +import java.util.UUID
    --- End diff --
    
    [error] /Users/gaborsomogyi/spark_review/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala:25:0: java.util.UUID is in wrong order relative to java.util.concurrent.atomic.AtomicLong.
    
    Please execute Scalastyle checks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    I mean the difference between `test("use backpressure.initialRate with backpressure")` and `test("backpressure.initialRate should honor maxRatePerPartition")` are 3 numbers. Wrapping the common code into one function and making 2 function call would be better.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r167242186
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala ---
    @@ -551,6 +551,76 @@ class DirectKafkaStreamSuite
           Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) -> 10L))
       }
     
    +  test("use backpressure.initialRate with backpressure") {
    +    val topic = "backpressureInitialRate"
    +    val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
    +    val sparkConf = new SparkConf()
    +      // Safe, even with streaming, because we're using the direct API.
    +      // Using 1 core is useful to make the test more predictable.
    +      .setMaster("local[1]")
    +      .setAppName(this.getClass.getSimpleName)
    +      .set("spark.streaming.backpressure.enabled", "true")
    +      .set("spark.streaming.kafka.maxRatePerPartition", "1000")
    +      .set("spark.streaming.backpressure.initialRate", "500")
    +
    +    val messages = Map("foo" -> 5000)
    +    kafkaTestUtils.sendMessages(topic, messages)
    +
    +    ssc = new StreamingContext(sparkConf, Milliseconds(500))
    +
    +    val kafkaStream = withClue("Error creating direct stream") {
    +      new DirectKafkaInputDStream[String, String](
    +        ssc,
    +        preferredHosts,
    +        ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala),
    +        new DefaultPerPartitionConfig(sparkConf)
    +      )
    +    }
    +    kafkaStream.start()
    +
    +    val input = Map(new TopicPartition(topic, 0) -> 1000L)
    +
    +    assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
    +      new TopicPartition(topic, 0) -> 250)) // we run for half a second
    +
    +    kafkaStream.stop()
    +  }
    +
    +  test("backpressure.initialRate should honor maxRatePerPartition") {
    +    val topic = "backpressureInitialRate"
    +    val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
    +    val sparkConf = new SparkConf()
    +      // Safe, even with streaming, because we're using the direct API.
    +      // Using 1 core is useful to make the test more predictable.
    +      .setMaster("local[1]")
    +      .setAppName(this.getClass.getSimpleName)
    +      .set("spark.streaming.backpressure.enabled", "true")
    +      .set("spark.streaming.kafka.maxRatePerPartition", "300")
    +      .set("spark.streaming.backpressure.initialRate", "1000")
    +
    +    val messages = Map("foo" -> 5000)
    +    kafkaTestUtils.sendMessages(topic, messages)
    +
    +    ssc = new StreamingContext(sparkConf, Milliseconds(500))
    +
    +    val kafkaStream = withClue("Error creating direct stream") {
    +      new DirectKafkaInputDStream[String, String](
    +        ssc,
    +        preferredHosts,
    +        ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala),
    +        new DefaultPerPartitionConfig(sparkConf)
    +      )
    +    }
    +    kafkaStream.start()
    +
    +    val input = Map(new TopicPartition(topic, 0) -> 1000L)
    +
    +    assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    Latest rate means rate of previous batch. Is it possible that in alive system 0 events were processed? Only if there is no backlog and no new events came during last batch. Completely possible.
    
    This happens during first ran. And this parameter should limit it during 1st ran. Quote from docs:
    
    This is the initial maximum receiving rate at which each receiver will receive data for the first batch when the backpressure mechanism is enabled.
    
    If it happened during system run, for example there is no backlog and no new events came, we still need to limit system rate since with LatestRate = 0 it results in no limit, causing danger of overflowing the system.
    If somehow cluster was so heavily loaded with other processes that could process 0 events in Spark Streaming, this means that we might have huge backlog after that. Which mean without this fix system has big chance of overflowing


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r158382365
  
    --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala ---
    @@ -91,9 +91,16 @@ class DirectKafkaInputDStream[
       private val maxRateLimitPerPartition: Long = context.sparkContext.getConf.getLong(
           "spark.streaming.kafka.maxRatePerPartition", 0)
     
    +  private val initialRate = context.sparkContext.getConf.getLong(
    +    "spark.streaming.backpressure.initialRate", 0)
    +
       protected[streaming] def maxMessagesPerPartition(
           offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = {
    -    val estimatedRateLimit = rateController.map(_.getLatestRate())
    +
    +    val estimatedRateLimit = rateController.map(x => {
    +      val lr = x.getLatestRate()
    +      if (lr > 0) lr else initialRate
    --- End diff --
    
    Is it possible that the server is so heavily loaded that current rate limit drops to 0?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r167395487
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala ---
    @@ -687,6 +618,51 @@ class DirectKafkaStreamSuite
         ssc.stop()
       }
     
    +  test("backpressure.initialRate should honor maxRatePerPartition") {
    +    backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250)
    +  }
    +
    +  test("use backpressure.initialRate with backpressure") {
    +    backpressureTest(maxRatePerPartition = 300, initialRate = 1000, maxMessagesPerPartition = 150)
    +  }
    +
    +  private def backpressureTest(maxRatePerPartition: Int,
    +                               initialRate: Int,
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r166952412
  
    --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala ---
    @@ -91,9 +91,16 @@ class DirectKafkaInputDStream[
       private val maxRateLimitPerPartition: Long = context.sparkContext.getConf.getLong(
           "spark.streaming.kafka.maxRatePerPartition", 0)
     
    +  private val initialRate = context.sparkContext.getConf.getLong(
    +    "spark.streaming.backpressure.initialRate", 0)
    +
       protected[streaming] def maxMessagesPerPartition(
           offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = {
    -    val estimatedRateLimit = rateController.map(_.getLatestRate())
    +
    +    val estimatedRateLimit = rateController.map(x => {
    +      val lr = x.getLatestRate()
    +      if (lr > 0) lr else initialRate
    --- End diff --
    
    Same applies here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r158374124
  
    --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala ---
    @@ -91,9 +91,16 @@ class DirectKafkaInputDStream[
       private val maxRateLimitPerPartition: Long = context.sparkContext.getConf.getLong(
           "spark.streaming.kafka.maxRatePerPartition", 0)
     
    +  private val initialRate = context.sparkContext.getConf.getLong(
    +    "spark.streaming.backpressure.initialRate", 0)
    +
       protected[streaming] def maxMessagesPerPartition(
           offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = {
    -    val estimatedRateLimit = rateController.map(_.getLatestRate())
    +
    +    val estimatedRateLimit = rateController.map(x => {
    --- End diff --
    
    style: `val estimatedRateLimit = rateController.map { x => {`
    
    Please take a look at https://spark.apache.org/contributing.html



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r167281995
  
    --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala ---
    @@ -539,6 +456,58 @@ class DirectKafkaStreamSuite
         ssc.stop()
       }
     
    +  test("backpressure.initialRate should honor maxRatePerPartition") {
    +    backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250)
    +  }
    +
    +  test("use backpressure.initialRate with backpressure") {
    +    backpressureTest(maxRatePerPartition = 300, initialRate = 1000, maxMessagesPerPartition = 150)
    +  }
    +
    +  private def backpressureTest(maxRatePerPartition: Int,
    +                               initialRate: Int,
    --- End diff --
    
    Indentation is wrong. Please check Code Style Guide.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    @akonopko thanks for this, if you can resolve merge conflict I think we can get this in


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    **[Test build #88477 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88477/testReport)** for PR 19431 at commit [`d11e807`](https://github.com/apache/spark/commit/d11e8078672048693b3538db902a2827d14eeaf5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DSTREAM][KAFKA] Add spark.streaming.backp...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    @koeninger done! 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r166605640
  
    --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala ---
    @@ -91,9 +91,16 @@ class DirectKafkaInputDStream[
       private val maxRateLimitPerPartition: Long = context.sparkContext.getConf.getLong(
           "spark.streaming.kafka.maxRatePerPartition", 0)
     
    +  private val initialRate = context.sparkContext.getConf.getLong(
    +    "spark.streaming.backpressure.initialRate", 0)
    +
       protected[streaming] def maxMessagesPerPartition(
           offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = {
    -    val estimatedRateLimit = rateController.map(_.getLatestRate())
    +
    +    val estimatedRateLimit = rateController.map(x => {
    +      val lr = x.getLatestRate()
    +      if (lr > 0) lr else initialRate
    --- End diff --
    
    Latest rate means rate of previous batch. Is it possible that in alive system 0 events were processed? Only if there is no backlog and no new events came during last batch. Completely possible.
    
    This happens during first ran. And this parameter should limit it during 1st ran. Quote from docs:
    
    This is the initial maximum receiving rate at which each receiver will receive data for the first batch when the backpressure mechanism is enabled.
    
    If it happened during system run, for example there is no backlog and no new events came, we still need to limit system rate since with LatestRate = 0 it results in no limit, causing danger of overflowing the system


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r167242320
  
    --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala ---
    @@ -387,6 +387,89 @@ class DirectKafkaStreamSuite
           Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L))
       }
     
    +  test("use backpressure.initialRate with backpressure") {
    +    val topic = "backpressureInitialRate"
    +    val topicPartitions = Set(TopicAndPartition(topic, 0))
    +    kafkaTestUtils.createTopic(topic, 1)
    +    val kafkaParams = Map(
    +      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
    +      "auto.offset.reset" -> "smallest"
    +    )
    +
    +    val sparkConf = new SparkConf()
    +      // Safe, even with streaming, because we're using the direct API.
    +      // Using 1 core is useful to make the test more predictable.
    +      .setMaster("local[1]")
    +      .setAppName(this.getClass.getSimpleName)
    +      .set("spark.streaming.backpressure.enabled", "true")
    +      .set("spark.streaming.backpressure.initialRate", "500")
    +
    +    val messages = Map("foo" -> 5000)
    +    kafkaTestUtils.sendMessages(topic, messages)
    +
    +    ssc = new StreamingContext(sparkConf, Milliseconds(500))
    +
    +    val kafkaStream = withClue("Error creating direct stream") {
    +      val kc = new KafkaCluster(kafkaParams)
    +      val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
    +      val m = kc.getEarliestLeaderOffsets(topicPartitions)
    +        .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset))
    +
    +      new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
    +        ssc, kafkaParams, m, messageHandler)
    +    }
    +    kafkaStream.start()
    +
    +    val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
    +
    +    assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88477/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    @akonopko Thanks! 
    Sorry, but I just noticed the title of the PR - can you adjust it to match convention, e.g.
    
    [SPARK-18580] [DSTREAM][KAFKA] Add spark.streaming.backpressure.initialRate to direct Kafka streams
    
    and then I'll get it merged ;)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r158382300
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ---
    @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
     
       protected[streaming] def maxMessagesPerPartition(
         offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
    -    val estimatedRateLimit = rateController.map(_.getLatestRate())
    +    val estimatedRateLimit = rateController.map(x => {
    +      val lr = x.getLatestRate()
    +      if (lr > 0) lr else initialRate
    --- End diff --
    
    Is it possible that the server is so heavily loaded that current rate limit drops to 0?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r158375540
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ---
    @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
     
       protected[streaming] def maxMessagesPerPartition(
         offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
    -    val estimatedRateLimit = rateController.map(_.getLatestRate())
    +    val estimatedRateLimit = rateController.map(x => {
    --- End diff --
    
    style: `val estimatedRateLimit = rateController.map { x => {`
    
    Please take a look at https://spark.apache.org/contributing.html



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    @koeninger resolved the conflict


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r173641331
  
    --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala ---
    @@ -456,6 +455,60 @@ class DirectKafkaStreamSuite
         ssc.stop()
       }
     
    +  test("backpressure.initialRate should honor maxRatePerPartition") {
    +    backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250)
    +  }
    +
    +  test("use backpressure.initialRate with backpressure") {
    --- End diff --
    
    Aren't the descriptions of these tests backwards, i.e. this the one testing that maxRatePerPartition is honored?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    Until now it was not fully clear documented in which situation does this parameter take effect. I would personally add things into the doc to be a bit more specific.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r167242353
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala ---
    @@ -551,6 +551,76 @@ class DirectKafkaStreamSuite
           Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) -> 10L))
       }
     
    +  test("use backpressure.initialRate with backpressure") {
    +    val topic = "backpressureInitialRate"
    +    val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
    +    val sparkConf = new SparkConf()
    +      // Safe, even with streaming, because we're using the direct API.
    +      // Using 1 core is useful to make the test more predictable.
    +      .setMaster("local[1]")
    +      .setAppName(this.getClass.getSimpleName)
    +      .set("spark.streaming.backpressure.enabled", "true")
    +      .set("spark.streaming.kafka.maxRatePerPartition", "1000")
    +      .set("spark.streaming.backpressure.initialRate", "500")
    +
    +    val messages = Map("foo" -> 5000)
    +    kafkaTestUtils.sendMessages(topic, messages)
    +
    +    ssc = new StreamingContext(sparkConf, Milliseconds(500))
    +
    +    val kafkaStream = withClue("Error creating direct stream") {
    +      new DirectKafkaInputDStream[String, String](
    +        ssc,
    +        preferredHosts,
    +        ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala),
    +        new DefaultPerPartitionConfig(sparkConf)
    +      )
    +    }
    +    kafkaStream.start()
    +
    +    val input = Map(new TopicPartition(topic, 0) -> 1000L)
    +
    +    assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by omuravskiy <gi...@git.apache.org>.
Github user omuravskiy commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    No objections


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r166953742
  
    --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala ---
    @@ -387,6 +387,89 @@ class DirectKafkaStreamSuite
           Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L))
       }
     
    +  test("use backpressure.initialRate with backpressure") {
    +    val topic = "backpressureInitialRate"
    +    val topicPartitions = Set(TopicAndPartition(topic, 0))
    +    kafkaTestUtils.createTopic(topic, 1)
    +    val kafkaParams = Map(
    +      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
    +      "auto.offset.reset" -> "smallest"
    +    )
    +
    +    val sparkConf = new SparkConf()
    +      // Safe, even with streaming, because we're using the direct API.
    +      // Using 1 core is useful to make the test more predictable.
    +      .setMaster("local[1]")
    +      .setAppName(this.getClass.getSimpleName)
    +      .set("spark.streaming.backpressure.enabled", "true")
    +      .set("spark.streaming.backpressure.initialRate", "500")
    +
    +    val messages = Map("foo" -> 5000)
    +    kafkaTestUtils.sendMessages(topic, messages)
    +
    +    ssc = new StreamingContext(sparkConf, Milliseconds(500))
    +
    +    val kafkaStream = withClue("Error creating direct stream") {
    +      val kc = new KafkaCluster(kafkaParams)
    +      val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
    +      val m = kc.getEarliestLeaderOffsets(topicPartitions)
    +        .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset))
    +
    +      new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
    +        ssc, kafkaParams, m, messageHandler)
    +    }
    +    kafkaStream.start()
    +
    +    val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
    +
    +    assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
    +      new TopicAndPartition(topic, 0) -> 250)) // we run for half a second
    +
    +    kafkaStream.stop()
    +  }
    +
    +  test("backpressure.initialRate should honor maxRatePerPartition") {
    +    val topic = "backpressureInitialRate2"
    +    val topicPartitions = Set(TopicAndPartition(topic, 0))
    +    kafkaTestUtils.createTopic(topic, 1)
    +    val kafkaParams = Map(
    +      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
    +      "auto.offset.reset" -> "smallest"
    +    )
    +
    +    val sparkConf = new SparkConf()
    +      // Safe, even with streaming, because we're using the direct API.
    +      // Using 1 core is useful to make the test more predictable.
    +      .setMaster("local[1]")
    +      .setAppName(this.getClass.getSimpleName)
    +      .set("spark.streaming.backpressure.enabled", "true")
    +      .set("spark.streaming.kafka.maxRatePerPartition", "300")
    +      .set("spark.streaming.backpressure.initialRate", "1000")
    +
    +    val messages = Map("foo" -> 5000)
    +    kafkaTestUtils.sendMessages(topic, messages)
    +
    +    ssc = new StreamingContext(sparkConf, Milliseconds(500))
    +
    +    val kafkaStream = withClue("Error creating direct stream") {
    +      val kc = new KafkaCluster(kafkaParams)
    +      val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
    +      val m = kc.getEarliestLeaderOffsets(topicPartitions)
    +        .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset))
    +
    +      new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
    +        ssc, kafkaParams, m, messageHandler)
    +    }
    +    kafkaStream.start()
    +
    +    val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
    +
    +    assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
    --- End diff --
    
    Same simplification here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r167395493
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala ---
    @@ -22,6 +22,7 @@ import java.lang.{ Long => JLong }
     import java.util.{ Arrays, HashMap => JHashMap, Map => JMap }
     import java.util.concurrent.ConcurrentLinkedQueue
     import java.util.concurrent.atomic.AtomicLong
    +import java.util.UUID
    --- End diff --
    
    Ah, they were disabled for test files. Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r166953800
  
    --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala ---
    @@ -387,6 +387,89 @@ class DirectKafkaStreamSuite
           Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L))
       }
     
    +  test("use backpressure.initialRate with backpressure") {
    +    val topic = "backpressureInitialRate"
    +    val topicPartitions = Set(TopicAndPartition(topic, 0))
    +    kafkaTestUtils.createTopic(topic, 1)
    +    val kafkaParams = Map(
    +      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
    +      "auto.offset.reset" -> "smallest"
    +    )
    +
    +    val sparkConf = new SparkConf()
    +      // Safe, even with streaming, because we're using the direct API.
    +      // Using 1 core is useful to make the test more predictable.
    +      .setMaster("local[1]")
    +      .setAppName(this.getClass.getSimpleName)
    +      .set("spark.streaming.backpressure.enabled", "true")
    +      .set("spark.streaming.backpressure.initialRate", "500")
    +
    +    val messages = Map("foo" -> 5000)
    +    kafkaTestUtils.sendMessages(topic, messages)
    +
    +    ssc = new StreamingContext(sparkConf, Milliseconds(500))
    +
    +    val kafkaStream = withClue("Error creating direct stream") {
    +      val kc = new KafkaCluster(kafkaParams)
    +      val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
    +      val m = kc.getEarliestLeaderOffsets(topicPartitions)
    +        .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset))
    +
    +      new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
    +        ssc, kafkaParams, m, messageHandler)
    +    }
    +    kafkaStream.start()
    +
    +    val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
    +
    +    assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
    --- End diff --
    
    Same simplification here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r167395482
  
    --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala ---
    @@ -539,6 +456,58 @@ class DirectKafkaStreamSuite
         ssc.stop()
       }
     
    +  test("backpressure.initialRate should honor maxRatePerPartition") {
    +    backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250)
    +  }
    +
    +  test("use backpressure.initialRate with backpressure") {
    +    backpressureTest(maxRatePerPartition = 300, initialRate = 1000, maxMessagesPerPartition = 150)
    +  }
    +
    +  private def backpressureTest(maxRatePerPartition: Int,
    +                               initialRate: Int,
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r167395492
  
    --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala ---
    @@ -21,6 +21,7 @@ import java.io.File
     import java.util.Arrays
     import java.util.concurrent.ConcurrentLinkedQueue
     import java.util.concurrent.atomic.AtomicLong
    +import java.util.UUID
    --- End diff --
    
    Ah, they were disabled for test files. Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    > Related the doc I thought it's kafka specific but it's not so fine like that
    Yes, it was implemented only in Kafka Streams but doc doesnt limit usage of this parameter to Kafka
     
    > good to merge the common functionalities
    Not sure I understood you correctly here. You mean in tests ?
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/19431
  
    Jenkins, ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

Posted by akonopko <gi...@git.apache.org>.
Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r166606906
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ---
    @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
     
       protected[streaming] def maxMessagesPerPartition(
         offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
    -    val estimatedRateLimit = rateController.map(_.getLatestRate())
    +    val estimatedRateLimit = rateController.map(x => {
    +      val lr = x.getLatestRate()
    +      if (lr > 0) lr else initialRate
    --- End diff --
    
    If somehow cluster was so heavily loaded with other processes that could process 0 events in Spark Streaming, this means that we might have huge backlog after that. Which mean without this fix system has big chance of overflowing


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org