You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by YuvalItzchakov <gi...@git.apache.org> on 2018/08/04 07:31:42 UTC

[GitHub] spark pull request #21997: [SPARK-24987][SS] - Fix Kafka consumer leak when ...

GitHub user YuvalItzchakov opened a pull request:

    https://github.com/apache/spark/pull/21997

    [SPARK-24987][SS] - Fix Kafka consumer leak when no new offsets for TopicPartition

    ## What changes were proposed in this pull request?
    
    This small fix adds a `consumer.release()` call to `KafkaSourceRDD` in the case where we've retrieved offsets from Kafka, but the `fromOffset` is equal to the `lastOffset`, meaning there is no new data to read for a particular topic partition. Up until now, we'd just return an empty iterator without closing the consumer which would cause a FD leak.
    
    If accepted, this pull request should be merged into master as well.
    
    ## How was this patch tested?
    
    Haven't ran any specific tests, would love help on how to test methods running inside `RDD.compute`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/YuvalItzchakov/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21997.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21997
    
----
commit c20bd14a4bed34644efc11de420a1caeccea329e
Author: Yuval Itzchakov <yu...@...>
Date:   2017-08-26T15:21:17Z

    Avoid using "return" inside `CachedKafkaConsumer.get` as it is passed to `org.apache.spark.util.UninterruptibleThread.runUninterruptibly` as a function type which causes a NonLocalReturnControl to be called for every call

commit 18b9301553427a7b6c038e144f1be52949d82eb9
Author: Yuval Itzchakov <yu...@...>
Date:   2017-08-27T07:58:01Z

    Comments after code review

commit 46af335ed42371c4bd200e63c9bec351ddcb112e
Author: Yuval Itzchakov <yu...@...>
Date:   2018-08-02T17:51:41Z

    Merge remote-tracking branch 'upstream/master'

commit 059b47a9a62a4630cfd1f43d4e3de41989adfd1b
Author: Yuval Itzchakov <yu...@...>
Date:   2018-08-04T07:23:39Z

    Merge remote-tracking branch 'origin/master'

commit 2b43146ff0155301cad403605f15171a8c6a9149
Author: Yuval Itzchakov <yu...@...>
Date:   2018-08-04T07:24:24Z

    Fixes SPARK-24987. Kafka consumer wasn't released when `fromOffset` was equal to `toOffset`.

commit 7558d422ae24daf9d3cffc43b5ef3d975c4c9d3a
Author: Yuval Itzchakov <yu...@...>
Date:   2018-08-04T07:28:26Z

    Merge remote-tracking branch 'upstream/master'

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21997: [SPARK-24987][SS] - Fix Kafka consumer leak when no new ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21997
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21997: [SPARK-24987][SS] - Fix Kafka consumer leak when ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21997


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21997: [SPARK-24987][SS] - Fix Kafka consumer leak when no new ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21997
  
    **[Test build #94218 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94218/testReport)** for PR 21997 at commit [`7558d42`](https://github.com/apache/spark/commit/7558d422ae24daf9d3cffc43b5ef3d975c4c9d3a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21997: [SPARK-24987][SS] - Fix Kafka consumer leak when ...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21997#discussion_r207716713
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala ---
    @@ -124,8 +124,6 @@ private[kafka010] class KafkaSourceRDD(
           thePart: Partition,
           context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = {
         val sourcePartition = thePart.asInstanceOf[KafkaSourceRDDPartition]
    -    val topic = sourcePartition.offsetRange.topic
    -    val kafkaPartition = sourcePartition.offsetRange.partition
    --- End diff --
    
    I see these are unused...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21997: [SPARK-24987][SS] - Fix Kafka consumer leak when no new ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21997
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94218/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21997: [SPARK-24987][SS] - Fix Kafka consumer leak when no new ...

Posted by YuvalItzchakov <gi...@git.apache.org>.
Github user YuvalItzchakov commented on the issue:

    https://github.com/apache/spark/pull/21997
  
    This is the same as https://github.com/apache/spark/pull/21983 only merged against master (after @felixcheung comment). Should be merged to branch-2.3.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21997: [SPARK-24987][SS] - Fix Kafka consumer leak when no new ...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on the issue:

    https://github.com/apache/spark/pull/21997
  
    @koeninger back


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21997: [SPARK-24987][SS] - Fix Kafka consumer leak when ...

Posted by YuvalItzchakov <gi...@git.apache.org>.
Github user YuvalItzchakov commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21997#discussion_r207716764
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala ---
    @@ -124,8 +124,6 @@ private[kafka010] class KafkaSourceRDD(
           thePart: Partition,
           context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = {
         val sourcePartition = thePart.asInstanceOf[KafkaSourceRDDPartition]
    -    val topic = sourcePartition.offsetRange.topic
    -    val kafkaPartition = sourcePartition.offsetRange.partition
    --- End diff --
    
    Yes, removed them since they were unused.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21997: [SPARK-24987][SS] - Fix Kafka consumer leak when no new ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21997
  
    **[Test build #94218 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94218/testReport)** for PR 21997 at commit [`7558d42`](https://github.com/apache/spark/commit/7558d422ae24daf9d3cffc43b5ef3d975c4c9d3a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class ArrayFilter(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21997: [SPARK-24987][SS] - Fix Kafka consumer leak when no new ...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on the issue:

    https://github.com/apache/spark/pull/21997
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21997: [SPARK-24987][SS] - Fix Kafka consumer leak when no new ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21997
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21997: [SPARK-24987][SS] - Fix Kafka consumer leak when no new ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21997
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21997: [SPARK-24987][SS] - Fix Kafka consumer leak when no new ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21997
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org