You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by rezasafi <gi...@git.apache.org> on 2018/08/24 16:50:37 UTC

[GitHub] spark pull request #22223: SPARK-25233. Give the user the option of specifyi...

GitHub user rezasafi opened a pull request:

    https://github.com/apache/spark/pull/22223

    SPARK-25233. Give the user the option of specifying a fixed minimum message per partition per batch when using kafka direct API with backpressure

    After SPARK-18371, it is guaranteed that there would be at least one message per partition per batch using direct kafka API when new messages exist in the topics. This change will give the user the option of setting the minimum instead of just a hard coded 1 limit
    The related unit test is updated and some internal tests verified that the topic partitions with new messages will be progressed by the specified minimum.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rezasafi/spark streaminglag

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22223.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22223
    
----
commit 067492485420216001266a4bb74fc4f10d9a4a99
Author: Reza Safi <re...@...>
Date:   2018-08-24T16:37:14Z

    SPARK-25233. Give the user the option of specifying a fixed minimum message per partition per batch when using kafka direct API with backpressure
    After SPARK-18371, it is guaranteed that there would be at least one message per partition per batch using direct kafka API when new messages exist in the topics. This change wiil give the user the option of setting the minimum instead of just a hard coded 1 limit.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    **[Test build #4296 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4296/testReport)** for PR 22223 at commit [`85ece1c`](https://github.com/apache/spark/commit/85ece1c0866164a3f5a260b6e226b01c1fd1dd81).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    **[Test build #4295 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4295/testReport)** for PR 22223 at commit [`2b0b1ce`](https://github.com/apache/spark/commit/2b0b1ce3876e2f55807156a98f75068280e03054).
     * This patch **fails MiMa tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    Thank you very much @koeninger 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    **[Test build #4303 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4303/testReport)** for PR 22223 at commit [`6f8d15a`](https://github.com/apache/spark/commit/6f8d15a86926eb3f34bdf9c58a1dc1d44749abb8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: SPARK-25233. Give the user the option of specifying a fi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    I think since the default behavior is still 1, it's probably ok to let
    someone do what they want here
    
    On Wed, Aug 29, 2018 at 3:51 PM, rezasafi <no...@github.com> wrote:
    
    > *@rezasafi* commented on this pull request.
    > ------------------------------
    >
    > In external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/
    > DirectKafkaInputDStream.scala
    > <https://github.com/apache/spark/pull/22223#discussion_r213829668>:
    >
    > > @@ -154,7 +153,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
    >      if (effectiveRateLimitPerPartition.values.sum > 0) {
    >        val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
    >        Some(effectiveRateLimitPerPartition.map {
    > -        case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 1L)
    > +        case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong,
    > +          Math.max(ppc.minRatePerPartition(tp), 1L))
    >
    > I just didn't want to break the reasoning behind SPARK-18371 to have at
    > least 1 always. I didn't have any other reason for this. I can change it to
    > give the user the freedom.
    >
    > —
    > You are receiving this because you were mentioned.
    > Reply to this email directly, view it on GitHub
    > <https://github.com/apache/spark/pull/22223#discussion_r213829668>, or mute
    > the thread
    > <https://github.com/notifications/unsubscribe-auth/AAGAB2QkKLkQOCQoRTA0hArIsJIhY99Oks5uVv7LgaJpZM4WLq9l>
    > .
    >



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    Jenkins, retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    **[Test build #4303 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4303/testReport)** for PR 22223 at commit [`6f8d15a`](https://github.com/apache/spark/commit/6f8d15a86926eb3f34bdf9c58a1dc1d44749abb8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22223: SPARK-25233. Give the user the option of specifyi...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22223#discussion_r212693216
  
    --- Diff: docs/configuration.md ---
    @@ -1925,6 +1925,14 @@ showDF(properties, numRows = 200, truncate = FALSE)
         first batch when the backpressure mechanism is enabled.
       </td>
     </tr>
    +<tr>
    +  <td><code>spark.streaming.backpressure.fixedMinMessagePerPartition</code></td>
    --- End diff --
    
    Yeah that make sense. Will apply your comment. Thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22223: [SPARK-25233][Streaming] Give the user the option...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22223#discussion_r213829668
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ---
    @@ -154,7 +153,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
         if (effectiveRateLimitPerPartition.values.sum > 0) {
           val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
           Some(effectiveRateLimitPerPartition.map {
    -        case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 1L)
    +        case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong,
    +          Math.max(ppc.minRatePerPartition(tp), 1L))
    --- End diff --
    
    I just didn't want to break the reasoning behind SPARK-18371 to have at least 1 always. I didn't have any other reason for this. I can change it to give the user the freedom.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22223: SPARK-25233. Give the user the option of specifyi...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22223#discussion_r212691622
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ---
    @@ -141,10 +143,9 @@ private[spark] class DirectKafkaInputDStream[K, V](
               tp -> Math.max(offset - currentOffsets(tp), 0)
             }
             val totalLag = lagPerPartition.values.sum
    -
             lagPerPartition.map { case (tp, lag) =>
               val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp)
    -          val backpressureRate = lag / totalLag.toDouble * rate
    +          var backpressureRate = lag / totalLag.toDouble * rate
    --- End diff --
    
    Why was this changed to a var?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: SPARK-25233. Give the user the option of specifying a fi...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    Jenkins, ok to test



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    **[Test build #4286 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4286/testReport)** for PR 22223 at commit [`f37b7e8`](https://github.com/apache/spark/commit/f37b7e81bd3b59afe859df3bec597ff9cafe1bbf).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: SPARK-25233. Give the user the option of specifying a fi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    **[Test build #4295 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4295/testReport)** for PR 22223 at commit [`2b0b1ce`](https://github.com/apache/spark/commit/2b0b1ce3876e2f55807156a98f75068280e03054).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: SPARK-25233. Give the user the option of specifying a fi...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    @koeninger @arzt will appreciate your feedback here. Thank you in advance.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    **[Test build #4286 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4286/testReport)** for PR 22223 at commit [`f37b7e8`](https://github.com/apache/spark/commit/f37b7e81bd3b59afe859df3bec597ff9cafe1bbf).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    Thanks, merging to master


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22223: SPARK-25233. Give the user the option of specifyi...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22223#discussion_r212692983
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ---
    @@ -141,10 +143,9 @@ private[spark] class DirectKafkaInputDStream[K, V](
               tp -> Math.max(offset - currentOffsets(tp), 0)
             }
             val totalLag = lagPerPartition.values.sum
    -
             lagPerPartition.map { case (tp, lag) =>
               val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp)
    -          val backpressureRate = lag / totalLag.toDouble * rate
    +          var backpressureRate = lag / totalLag.toDouble * rate
    --- End diff --
    
    Sorry that was a mistake from another change I was trying to do. Forgot to revert. will fix this


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22223: SPARK-25233. Give the user the option of specifyi...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22223#discussion_r212691455
  
    --- Diff: docs/configuration.md ---
    @@ -1925,6 +1925,14 @@ showDF(properties, numRows = 200, truncate = FALSE)
         first batch when the backpressure mechanism is enabled.
       </td>
     </tr>
    +<tr>
    +  <td><code>spark.streaming.backpressure.fixedMinMessagePerPartition</code></td>
    --- End diff --
    
    This only applies to Kafka.  Why not namespace it under spark.streaming.kafka?
    
    What does the word "fixed" add to the explanation?
    
    There's already a maxRatePerPartition, why not minRatePerPartition?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22223: [SPARK-25233][Streaming] Give the user the option...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22223#discussion_r213825892
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ---
    @@ -154,7 +153,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
         if (effectiveRateLimitPerPartition.values.sum > 0) {
           val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
           Some(effectiveRateLimitPerPartition.map {
    -        case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 1L)
    +        case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong,
    +          Math.max(ppc.minRatePerPartition(tp), 1L))
    --- End diff --
    
    Is the second Math.max actually necessary?
    The default implementation of minRatePerPartition will be 1 anyway.
    If someone makes a custom implementation that e.g. returns zero, should they get what they asked for?.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    **[Test build #4296 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4296/testReport)** for PR 22223 at commit [`85ece1c`](https://github.com/apache/spark/commit/85ece1c0866164a3f5a260b6e226b01c1fd1dd81).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    Thank you very much @koeninger. I appreciate it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22223: [SPARK-25233][Streaming] Give the user the option...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/22223


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22223: SPARK-25233. Give the user the option of specifying a fi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22223
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org