You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ScrapCodes <gi...@git.apache.org> on 2017/05/30 06:29:42 UTC

[GitHub] spark pull request #18143: Simplificaiton of CachedKafkaConsumer using guava...

GitHub user ScrapCodes opened a pull request:

    https://github.com/apache/spark/pull/18143

    Simplificaiton of CachedKafkaConsumer using guava cache.

    ## What changes were proposed in this pull request?
    
    On the lines of SPARK-19968, guava cache can be used to simplify the code in CachedKafkaConsumer as well. With an additional feature of automatic cleanup of a consumer unused for a configurable time.
    
    ## How was this patch tested?
    
    Existing tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ScrapCodes/spark kafkaConsumer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18143.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18143
    
----
commit 710dae8851b7779936d1e1bc987567b113ae1bce
Author: Prashant Sharma <pr...@in.ibm.com>
Date:   2017-05-30T06:15:09Z

    Simplificaiton of CachedKafkaConsumer using guava cache.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    @brkyvz I think consumers and producers have different cache strategy and seem sharing the same interface is weird. We can share the same producer in multiple tasks at the same time, but that's not allowed for consumers. The interface I'm expecting for consumers is:
    
    ```
    // Create or fetch a consumer from cache if any. The cache doesn't store this consumer.
    def borrowConsumer(groupId: ...): CachedKafkaConsumer = {...}
    
    // Return a consumer to the cache.
    def returnConsumer(consumer: CachedKafkaConsumer): Unit = {...}  
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #79710 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79710/testReport)** for PR 18143 at commit [`5c105e5`](https://github.com/apache/spark/commit/5c105e5d9cae2140f7b4dd42f88ddeeb65389237).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119132090
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
     
       private lazy val cache = {
         val conf = SparkEnv.get.conf
    -    val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
    -    new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) {
    -      override def removeEldestEntry(
    -        entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
    -        if (entry.getValue.inuse == false && this.size > capacity) {
    -          logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " +
    -            s"removing consumer for ${entry.getKey}")
    -          try {
    -            entry.getValue.close()
    -          } catch {
    -            case e: SparkException =>
    -              logError(s"Error closing earliest Kafka consumer for ${entry.getKey}", e)
    -          }
    -          true
    -        } else {
    -          false
    +    val capacityConfigString: String = "spark.sql.kafkaConsumerCache.capacity"
    +    val capacity = conf.getInt(capacityConfigString, 64)
    +    val duration = conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m")
    --- End diff --
    
    It seems like the general trend towards default configuration values is to make them work for the lowest common denominator use case, in which case I'd argue for a longer (30 min?) default timeout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Thanks @koeninger for taking a look. I did some testing to see if performance was impacted, and made corrections. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #77532 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77532/testReport)** for PR 18143 at commit [`1e0ac23`](https://github.com/apache/spark/commit/1e0ac23128583c42a13fa4bd0b0950e6531cc3f0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #79710 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79710/testReport)** for PR 18143 at commit [`5c105e5`](https://github.com/apache/spark/commit/5c105e5d9cae2140f7b4dd42f88ddeeb65389237).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79707/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119562656
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -45,9 +46,6 @@ private[kafka010] case class CachedKafkaConsumer private(
     
       private var consumer = createConsumer
     
    -  /** indicates whether this consumer is in use or not */
    -  private var inuse = true
    --- End diff --
    
    Don't we need to track this anymore? Will we evict a consumer in use?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119084772
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -18,19 +18,19 @@
     package org.apache.spark.sql.kafka010
     
     import java.{util => ju}
    -import java.util.concurrent.TimeoutException
    -
    -import scala.collection.JavaConverters._
    +import java.util.concurrent.{Callable, TimeoutException, TimeUnit}
     
    +import com.google.common.cache._
     import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetOutOfRangeException}
     import org.apache.kafka.common.TopicPartition
    +import scala.collection.JavaConverters._
    +import scala.util.control.NonFatal
    --- End diff --
    
    Thanks @srowen for taking a look, I am a bit unsure, how? 
    BTW, is this not fully covered by our scalastyle check ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #77640 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77640/testReport)** for PR 18143 at commit [`dad946f`](https://github.com/apache/spark/commit/dad946f6aecab3a85eee106db88dc0b3551bed02).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    I am currently, trying to run some performance tests and see how this change impacts performance in any case. Meanwhile, if I could get an idea if things are moving in the right direction that would be kind.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #86270 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86270/testReport)** for PR 18143 at commit [`5840a3c`](https://github.com/apache/spark/commit/5840a3c3481178d8490462a491c7a0b8551e163e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #77590 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77590/testReport)** for PR 18143 at commit [`a71f9e6`](https://github.com/apache/spark/commit/a71f9e6aa87f3eb5bde722637350b387ba33b2f9).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119073318
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -383,19 +362,16 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
         // If this is reattempt at running the task, then invalidate cache and start with
         // a new consumer
         if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
    -      removeKafkaConsumer(topic, partition, kafkaParams)
    -      val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
    -      consumer.inuse = true
    -      cache.put(key, consumer)
    -      consumer
    -    } else {
    -      if (!cache.containsKey(key)) {
    -        cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams))
    -      }
    -      val consumer = cache.get(key)
    -      consumer.inuse = true
    -      consumer
    +      cache.invalidate(key)
         }
    +
    +    val consumer = cache.get(key, new Callable[CachedKafkaConsumer] {
    --- End diff --
    
    I'm probably wrong about this, but does Scala let you write a simple lambda here? because it's an interface with just one method. Might be clearer if so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119636950
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala ---
    @@ -109,34 +113,42 @@ object CachedKafkaConsumer extends Logging {
     
       private case class CacheKey(groupId: String, topic: String, partition: Int)
     
    -  // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap
    -  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null
    +  private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null
     
       /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */
       def init(
           initialCapacity: Int,
           maxCapacity: Int,
           loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
    --- End diff --
    
    Load factor is being ignored at this point, yes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Sorry, noticed a couple more minor configuration related things.  Otherwise LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79708/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #79711 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79711/testReport)** for PR 18143 at commit [`ea30495`](https://github.com/apache/spark/commit/ea304954a64dbeb68e6d2508173aac13c3392a3a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79711/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79710/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79909/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77532/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #77590 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77590/testReport)** for PR 18143 at commit [`a71f9e6`](https://github.com/apache/spark/commit/a71f9e6aa87f3eb5bde722637350b387ba33b2f9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #79907 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79907/testReport)** for PR 18143 at commit [`db86d97`](https://github.com/apache/spark/commit/db86d97d940bc887bb6ba93d763d178ccd3d88c2).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119114480
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
     
       private lazy val cache = {
         val conf = SparkEnv.get.conf
    -    val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
    -    new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) {
    -      override def removeEldestEntry(
    -        entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
    -        if (entry.getValue.inuse == false && this.size > capacity) {
    -          logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " +
    -            s"removing consumer for ${entry.getKey}")
    -          try {
    -            entry.getValue.close()
    -          } catch {
    -            case e: SparkException =>
    -              logError(s"Error closing earliest Kafka consumer for ${entry.getKey}", e)
    -          }
    -          true
    -        } else {
    -          false
    +    val capacityConfigString: String = "spark.sql.kafkaConsumerCache.capacity"
    +    val capacity = conf.getInt(capacityConfigString, 64)
    +    val duration = conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m")
    +    val removalListener = new RemovalListener[CacheKey, CachedKafkaConsumer] {
    +      override def onRemoval(n: RemovalNotification[CacheKey, CachedKafkaConsumer]): Unit = {
    +        n.getCause match {
    +          case RemovalCause.SIZE =>
    +            logWarning(
    --- End diff --
    
    I still don't see why this is a warning? it's normal operation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #77522 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77522/testReport)** for PR 18143 at commit [`4bd48aa`](https://github.com/apache/spark/commit/4bd48aa1ef924968eff4226c92cea2a37c7f02a6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #86529 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86529/testReport)** for PR 18143 at commit [`73d59f6`](https://github.com/apache/spark/commit/73d59f63e912612816e2163d1a9b74edb5504b86).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    @ScrapCodes I think it should be bounded by `spark.sql.kafkaConsumerCache.capacity`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #77522 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77522/testReport)** for PR 18143 at commit [`4bd48aa`](https://github.com/apache/spark/commit/4bd48aa1ef924968eff4226c92cea2a37c7f02a6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #79708 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79708/testReport)** for PR 18143 at commit [`a0f1aa3`](https://github.com/apache/spark/commit/a0f1aa383c696d3d754e8709e361e502720be227).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119073484
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -383,19 +362,16 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
         // If this is reattempt at running the task, then invalidate cache and start with
         // a new consumer
         if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
    -      removeKafkaConsumer(topic, partition, kafkaParams)
    -      val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
    -      consumer.inuse = true
    -      cache.put(key, consumer)
    -      consumer
    -    } else {
    -      if (!cache.containsKey(key)) {
    -        cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams))
    -      }
    -      val consumer = cache.get(key)
    -      consumer.inuse = true
    -      consumer
    +      cache.invalidate(key)
         }
    +
    +    val consumer = cache.get(key, new Callable[CachedKafkaConsumer] {
    +      override def call(): CachedKafkaConsumer = {
    +        new CachedKafkaConsumer(topicPartition, kafkaParams)
    +      }
    +    })
    +    consumer.inuse = true
    --- End diff --
    
    I think there's a race condition now, because modifying inuse isn't done atomically anymore


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119371920
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala ---
    @@ -109,34 +113,38 @@ object CachedKafkaConsumer extends Logging {
     
       private case class CacheKey(groupId: String, topic: String, partition: Int)
     
    -  // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap
    -  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null
    +  private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null
     
       /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */
       def init(
           initialCapacity: Int,
           maxCapacity: Int,
           loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
    -    if (null == cache) {
    -      logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor")
    -      cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
    -        initialCapacity, loadFactor, true) {
    -        override def removeEldestEntry(
    -          entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): Boolean = {
    -          if (this.size > maxCapacity) {
    -            try {
    -              entry.getValue.consumer.close()
    -            } catch {
    -              case x: KafkaException =>
    -                logError("Error closing oldest Kafka consumer", x)
    -            }
    -            true
    -          } else {
    -            false
    -          }
    +    val duration = SparkEnv.get.conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "30m")
    +    val removalListener = new RemovalListener[CacheKey, CachedKafkaConsumer[_, _]] {
    +      override def onRemoval(n: RemovalNotification[CacheKey, CachedKafkaConsumer[_, _]]): Unit = {
    +        n.getCause match {
    +          case RemovalCause.SIZE =>
    +            logWarning(
    +              s"Evicting consumer ${n.getKey}, due to size limit reached. Capacity: $maxCapacity.")
    +          case _ => logDebug(s"Evicting consumer ${n.getKey}, cause: ${n.getCause}")
    +        }
    +        try {
    +          n.getValue.close()
    +        } catch {
    +          case NonFatal(e) =>
    +            logWarning(s"Error in closing Kafka consumer: ${n.getKey}," +
    +              s" evicted from cache due to ${n.getCause}", e)
             }
           }
         }
    +    if (cache == null) {
    --- End diff --
    
    Shouldn't the whole function be guarded by this if statement?  Is there any reason to construct a removal listener otherwise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119088849
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -18,19 +18,19 @@
     package org.apache.spark.sql.kafka010
     
     import java.{util => ju}
    -import java.util.concurrent.TimeoutException
    -
    -import scala.collection.JavaConverters._
    +import java.util.concurrent.{Callable, TimeoutException, TimeUnit}
     
    +import com.google.common.cache._
     import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetOutOfRangeException}
     import org.apache.kafka.common.TopicPartition
    +import scala.collection.JavaConverters._
    +import scala.util.control.NonFatal
    --- End diff --
    
    Got it, you mean scala imports come right after java.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119085683
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -383,19 +362,16 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
         // If this is reattempt at running the task, then invalidate cache and start with
         // a new consumer
         if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
    -      removeKafkaConsumer(topic, partition, kafkaParams)
    -      val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
    -      consumer.inuse = true
    -      cache.put(key, consumer)
    -      consumer
    -    } else {
    -      if (!cache.containsKey(key)) {
    -        cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams))
    -      }
    -      val consumer = cache.get(key)
    -      consumer.inuse = true
    -      consumer
    +      cache.invalidate(key)
         }
    +
    +    val consumer = cache.get(key, new Callable[CachedKafkaConsumer] {
    --- End diff --
    
    AFAIK, this is possible scala 2.12 onwards. [reference](http://www.scala-lang.org/news/2.12.0#lambda-syntax-for-sam-types)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119374298
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala ---
    @@ -147,20 +155,14 @@ object CachedKafkaConsumer extends Logging {
           groupId: String,
           topic: String,
           partition: Int,
    -      kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] =
    -    CachedKafkaConsumer.synchronized {
    +      kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] = {
           val k = CacheKey(groupId, topic, partition)
    -      val v = cache.get(k)
    -      if (null == v) {
    -        logInfo(s"Cache miss for $k")
    -        logDebug(cache.keySet.toString)
    -        val c = new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams)
    -        cache.put(k, c)
    -        c
    -      } else {
    -        // any given topicpartition should have a consistent key and value type
    -        v.asInstanceOf[CachedKafkaConsumer[K, V]]
    -      }
    +      val v = cache.get(k, new Callable[CachedKafkaConsumer[_, _]] {
    +        override def call(): CachedKafkaConsumer[K, V] = {
    +          new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams)
    --- End diff --
    
    I think it's worth keeping the info / debug level logging for when a cache miss has happened.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119636360
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala ---
    @@ -109,34 +113,42 @@ object CachedKafkaConsumer extends Logging {
     
       private case class CacheKey(groupId: String, topic: String, partition: Int)
     
    -  // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap
    -  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null
    +  private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null
     
       /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */
       def init(
           initialCapacity: Int,
           maxCapacity: Int,
           loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
    -    if (null == cache) {
    -      logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor")
    -      cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
    -        initialCapacity, loadFactor, true) {
    -        override def removeEldestEntry(
    -          entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): Boolean = {
    -          if (this.size > maxCapacity) {
    -            try {
    -              entry.getValue.consumer.close()
    -            } catch {
    -              case x: KafkaException =>
    -                logError("Error closing oldest Kafka consumer", x)
    -            }
    -            true
    -          } else {
    -            false
    +    if (cache == null) {
    +      val duration =
    +        SparkEnv.get.conf.getTimeAsMs("spark.streaming.kafkaConsumerCache.timeout", "30m")
    +      val removalListener = new RemovalListener[CacheKey, CachedKafkaConsumer[_, _]] {
    +        override def onRemoval(
    +            n: RemovalNotification[CacheKey, CachedKafkaConsumer[_, _]]): Unit = {
    +          n.getCause match {
    +            case RemovalCause.SIZE =>
    +              logWarning(
    +                s"Evicting consumer ${n.getKey}," +
    +                  s" due to size limit reached. Capacity: $maxCapacity.")
    +            case _ => logDebug(s"Evicting consumer ${n.getKey}, cause: ${n.getCause}")
    +          }
    +          try {
    +            n.getValue.close()
    +          } catch {
    +            case NonFatal(e) =>
    +              logWarning(s"Error in closing Kafka consumer: ${n.getKey}," +
    +                s" evicted from cache due to ${n.getCause}", e)
               }
             }
           }
    +
    +      cache = CacheBuilder.newBuilder()
    +        .maximumSize(maxCapacity).removalListener(removalListener)
    --- End diff --
    
    initialCapacity is being passed in, but not used, this should probably set initial capacity as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Thanks @ScrapCodes for this PR. I was envisioning using a common cache interface for both the producer and the consumer, which kinda looked like:
    ```scala
    trait KafkaClientCache[K, V] {
      lazy val initialSize: Int
      lazy val maximumSize: Int
      lazy val timeoutDuration: Duration
    
      def createNewClient: V // to be implemented
    
      final def getOrCreate(key: K): V = {
        ... implementation
      }
    
      def onRemove(n: RemovalNotification[K, V]): Unit  // to be implemented
    
      // For testing
      private def clear(): Unit
    }
    ```
    
    or something like that. Feel free to push back if you think it's over-abstracting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #77640 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77640/testReport)** for PR 18143 at commit [`dad946f`](https://github.com/apache/spark/commit/dad946f6aecab3a85eee106db88dc0b3551bed02).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #79907 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79907/testReport)** for PR 18143 at commit [`db86d97`](https://github.com/apache/spark/commit/db86d97d940bc887bb6ba93d763d178ccd3d88c2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77522/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #79711 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79711/testReport)** for PR 18143 at commit [`ea30495`](https://github.com/apache/spark/commit/ea304954a64dbeb68e6d2508173aac13c3392a3a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119073041
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -311,61 +311,40 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
       private lazy val cache = {
         val conf = SparkEnv.get.conf
         val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
    -    new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) {
    -      override def removeEldestEntry(
    -        entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
    -        if (entry.getValue.inuse == false && this.size > capacity) {
    -          logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " +
    -            s"removing consumer for ${entry.getKey}")
    -          try {
    -            entry.getValue.close()
    -          } catch {
    -            case e: SparkException =>
    -              logError(s"Error closing earliest Kafka consumer for ${entry.getKey}", e)
    -          }
    -          true
    -        } else {
    -          false
    +    val duration = conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m")
    +    val removalListener = new RemovalListener[CacheKey, CachedKafkaConsumer] {
    +      override def onRemoval(n: RemovalNotification[CacheKey, CachedKafkaConsumer]): Unit = {
    +        val logMsg: String = s"Evicting consumer ${n.getKey}, cause: ${n.getCause}"
    +        n.getCause match {
    +          case RemovalCause.SIZE => logWarning(logMsg)
    --- End diff --
    
    Why is removing for size a warning?
    You probably want different messages then, and, if so, should put the pattern string in the invocation. Otherwise you're always constructing the message even in the common path where the debug message is ignored


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119582631
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -45,9 +46,6 @@ private[kafka010] case class CachedKafkaConsumer private(
     
       private var consumer = createConsumer
     
    -  /** indicates whether this consumer is in use or not */
    -  private var inuse = true
    --- End diff --
    
    Thanks, good point !, 
    According to guava cache docs, When cache "approaches" the size limit - "The cache will try to evict entries that haven't been used recently or very often. " 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86529/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119114508
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
     
       private lazy val cache = {
         val conf = SparkEnv.get.conf
    -    val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
    -    new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) {
    -      override def removeEldestEntry(
    -        entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
    -        if (entry.getValue.inuse == false && this.size > capacity) {
    -          logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " +
    -            s"removing consumer for ${entry.getKey}")
    -          try {
    -            entry.getValue.close()
    -          } catch {
    -            case e: SparkException =>
    -              logError(s"Error closing earliest Kafka consumer for ${entry.getKey}", e)
    -          }
    -          true
    -        } else {
    -          false
    +    val capacityConfigString: String = "spark.sql.kafkaConsumerCache.capacity"
    --- End diff --
    
    Why declare a type?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes closed the pull request at:

    https://github.com/apache/spark/pull/18143


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119371285
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala ---
    @@ -109,34 +113,38 @@ object CachedKafkaConsumer extends Logging {
     
       private case class CacheKey(groupId: String, topic: String, partition: Int)
     
    -  // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap
    -  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null
    +  private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null
     
       /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */
       def init(
           initialCapacity: Int,
           maxCapacity: Int,
           loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
    -    if (null == cache) {
    -      logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor")
    -      cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
    -        initialCapacity, loadFactor, true) {
    -        override def removeEldestEntry(
    -          entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): Boolean = {
    -          if (this.size > maxCapacity) {
    -            try {
    -              entry.getValue.consumer.close()
    -            } catch {
    -              case x: KafkaException =>
    -                logError("Error closing oldest Kafka consumer", x)
    -            }
    -            true
    -          } else {
    -            false
    -          }
    +    val duration = SparkEnv.get.conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "30m")
    --- End diff --
    
    I don't think this should be using configuration from the spark.sql namespace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #79707 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79707/testReport)** for PR 18143 at commit [`de733f1`](https://github.com/apache/spark/commit/de733f14bbb447de593278b228635e6f1cea6f2c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77777/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119072852
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -18,19 +18,19 @@
     package org.apache.spark.sql.kafka010
     
     import java.{util => ju}
    -import java.util.concurrent.TimeoutException
    -
    -import scala.collection.JavaConverters._
    +import java.util.concurrent.{Callable, TimeoutException, TimeUnit}
     
    +import com.google.common.cache._
     import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetOutOfRangeException}
     import org.apache.kafka.common.TopicPartition
    +import scala.collection.JavaConverters._
    +import scala.util.control.NonFatal
    --- End diff --
    
    These are missorted now -- the imports shouldn't move


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    @ScrapCodes sorry for the delay. I think @tdas has fixed the issue. Please close the PR.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #86529 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86529/testReport)** for PR 18143 at commit [`73d59f6`](https://github.com/apache/spark/commit/73d59f63e912612816e2163d1a9b74edb5504b86).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    @brkyvz and @zsxwing, do you think this object pool should be bounded ? Or they can become weak reference values, incase the object pool is unbounded ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #77532 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77532/testReport)** for PR 18143 at commit [`1e0ac23`](https://github.com/apache/spark/commit/1e0ac23128583c42a13fa4bd0b0950e6531cc3f0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #79908 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79908/testReport)** for PR 18143 at commit [`3c6f7c6`](https://github.com/apache/spark/commit/3c6f7c609889aad7c01249430d1fcc170410cb03).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77640/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119131006
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
     
       private lazy val cache = {
         val conf = SparkEnv.get.conf
    -    val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
    -    new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) {
    -      override def removeEldestEntry(
    -        entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
    -        if (entry.getValue.inuse == false && this.size > capacity) {
    -          logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " +
    -            s"removing consumer for ${entry.getKey}")
    -          try {
    -            entry.getValue.close()
    -          } catch {
    -            case e: SparkException =>
    -              logError(s"Error closing earliest Kafka consumer for ${entry.getKey}", e)
    -          }
    -          true
    -        } else {
    -          false
    +    val capacityConfigString: String = "spark.sql.kafkaConsumerCache.capacity"
    +    val capacity = conf.getInt(capacityConfigString, 64)
    +    val duration = conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m")
    +    val removalListener = new RemovalListener[CacheKey, CachedKafkaConsumer] {
    +      override def onRemoval(n: RemovalNotification[CacheKey, CachedKafkaConsumer]): Unit = {
    +        n.getCause match {
    +          case RemovalCause.SIZE =>
    +            logWarning(
    --- End diff --
    
    It shouldn't really be a normal operation.  If capacity is smaller than the number of partitions that are regularly being assigned to a given node, it's going to kill performance due to recreating consumers every batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119116252
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
     
       private lazy val cache = {
         val conf = SparkEnv.get.conf
    -    val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
    -    new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) {
    -      override def removeEldestEntry(
    -        entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
    -        if (entry.getValue.inuse == false && this.size > capacity) {
    -          logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " +
    -            s"removing consumer for ${entry.getKey}")
    -          try {
    -            entry.getValue.close()
    -          } catch {
    -            case e: SparkException =>
    -              logError(s"Error closing earliest Kafka consumer for ${entry.getKey}", e)
    -          }
    -          true
    -        } else {
    -          false
    +    val capacityConfigString: String = "spark.sql.kafkaConsumerCache.capacity"
    +    val capacity = conf.getInt(capacityConfigString, 64)
    +    val duration = conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m")
    +    val removalListener = new RemovalListener[CacheKey, CachedKafkaConsumer] {
    +      override def onRemoval(n: RemovalNotification[CacheKey, CachedKafkaConsumer]): Unit = {
    +        n.getCause match {
    +          case RemovalCause.SIZE =>
    +            logWarning(
    +              s"""
    +                 |Evicting consumer ${n.getKey}, due to size limit reached. Capacity: $capacity.
    +                 |This can be configured using $capacityConfigString.
    +                 |"""".stripMargin)
    +          case _ => logDebug(s"Evicting consumer ${n.getKey}, cause: ${n.getCause}")
    +        }
    +        try {
    +          n.getValue.close()
    +        } catch {
    +          case NonFatal(e) =>
    +            logError(s"Error closing Kafka consumer: ${n.getKey}, evicted due to ${n.getCause}", e)
    --- End diff --
    
    On the flip side, this isn't really an error -- you can recover by just continuing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r162045340
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -45,9 +46,6 @@ private[kafka010] case class CachedKafkaConsumer private(
     
       private var consumer = createConsumer
     
    -  /** indicates whether this consumer is in use or not */
    -  private var inuse = true
    --- End diff --
    
    Now that we have moved to using object pools, this tracking is no longer required.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Thanks @koeninger for helping me.
    
    Thanks @brkyvz, please take a look again and see if what I have done is along the lines the change you wanted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79908/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119116101
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
     
       private lazy val cache = {
         val conf = SparkEnv.get.conf
    -    val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
    -    new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) {
    -      override def removeEldestEntry(
    -        entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
    -        if (entry.getValue.inuse == false && this.size > capacity) {
    -          logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " +
    -            s"removing consumer for ${entry.getKey}")
    -          try {
    -            entry.getValue.close()
    -          } catch {
    -            case e: SparkException =>
    -              logError(s"Error closing earliest Kafka consumer for ${entry.getKey}", e)
    -          }
    -          true
    -        } else {
    -          false
    +    val capacityConfigString: String = "spark.sql.kafkaConsumerCache.capacity"
    +    val capacity = conf.getInt(capacityConfigString, 64)
    +    val duration = conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m")
    +    val removalListener = new RemovalListener[CacheKey, CachedKafkaConsumer] {
    +      override def onRemoval(n: RemovalNotification[CacheKey, CachedKafkaConsumer]): Unit = {
    +        n.getCause match {
    +          case RemovalCause.SIZE =>
    +            logWarning(
    +              s"""
    +                 |Evicting consumer ${n.getKey}, due to size limit reached. Capacity: $capacity.
    +                 |This can be configured using $capacityConfigString.
    +                 |"""".stripMargin)
    +          case _ => logDebug(s"Evicting consumer ${n.getKey}, cause: ${n.getCause}")
    +        }
    +        try {
    +          n.getValue.close()
    +        } catch {
    +          case NonFatal(e) =>
    +            logError(s"Error closing Kafka consumer: ${n.getKey}, evicted due to ${n.getCause}", e)
             }
           }
         }
    -  }
    -
    -  def releaseKafkaConsumer(
    -      topic: String,
    -      partition: Int,
    -      kafkaParams: ju.Map[String, Object]): Unit = {
    -    val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -    val topicPartition = new TopicPartition(topic, partition)
    -    val key = CacheKey(groupId, topicPartition)
     
    -    synchronized {
    -      val consumer = cache.get(key)
    -      if (consumer != null) {
    -        consumer.inuse = false
    -      } else {
    -        logWarning(s"Attempting to release consumer that does not exist")
    -      }
    -    }
    +    val guavaCache: Cache[CacheKey, CachedKafkaConsumer] =
    --- End diff --
    
    Why bother declaring this `val`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    @zsxwing, can you please take a look.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79907/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #77777 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77777/testReport)** for PR 18143 at commit [`288ab39`](https://github.com/apache/spark/commit/288ab39923ee93d08f7b98313c88701e862359cd).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #79908 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79908/testReport)** for PR 18143 at commit [`3c6f7c6`](https://github.com/apache/spark/commit/3c6f7c609889aad7c01249430d1fcc170410cb03).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86270/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #79708 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79708/testReport)** for PR 18143 at commit [`a0f1aa3`](https://github.com/apache/spark/commit/a0f1aa383c696d3d754e8709e361e502720be227).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/139/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    @brkyvz and @zsxwing No comments on whether this will be useful or not, so far. Should I consider closing it ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #86270 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86270/testReport)** for PR 18143 at commit [`5840a3c`](https://github.com/apache/spark/commit/5840a3c3481178d8490462a491c7a0b8551e163e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77590/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119277567
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---
    @@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
     
       private lazy val cache = {
         val conf = SparkEnv.get.conf
    -    val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
    -    new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) {
    -      override def removeEldestEntry(
    -        entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
    -        if (entry.getValue.inuse == false && this.size > capacity) {
    -          logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " +
    -            s"removing consumer for ${entry.getKey}")
    -          try {
    -            entry.getValue.close()
    -          } catch {
    -            case e: SparkException =>
    -              logError(s"Error closing earliest Kafka consumer for ${entry.getKey}", e)
    -          }
    -          true
    -        } else {
    -          false
    +    val capacityConfigString: String = "spark.sql.kafkaConsumerCache.capacity"
    --- End diff --
    
    This is used later to construct warning log message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18143#discussion_r119636878
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala ---
    @@ -109,34 +113,42 @@ object CachedKafkaConsumer extends Logging {
     
       private case class CacheKey(groupId: String, topic: String, partition: Int)
     
    -  // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap
    -  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null
    +  private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null
     
       /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */
       def init(
           initialCapacity: Int,
           maxCapacity: Int,
           loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
    -    if (null == cache) {
    -      logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor")
    -      cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
    -        initialCapacity, loadFactor, true) {
    -        override def removeEldestEntry(
    -          entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): Boolean = {
    -          if (this.size > maxCapacity) {
    -            try {
    -              entry.getValue.consumer.close()
    -            } catch {
    -              case x: KafkaException =>
    -                logError("Error closing oldest Kafka consumer", x)
    -            }
    -            true
    -          } else {
    -            false
    +    if (cache == null) {
    +      val duration =
    +        SparkEnv.get.conf.getTimeAsMs("spark.streaming.kafkaConsumerCache.timeout", "30m")
    --- End diff --
    
    All of the other configs in the dstream are of the form "spark.streaming.kafka.consumer.cache.something", not sure whether it's better to be consistent with the sql config, or with the dstream config


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #79909 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79909/testReport)** for PR 18143 at commit [`1c122ca`](https://github.com/apache/spark/commit/1c122caff1a485fb766f36b71280b3d8ac34595a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Thanks @srowen for taking a look. I have tried to address your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #79707 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79707/testReport)** for PR 18143 at commit [`de733f1`](https://github.com/apache/spark/commit/de733f14bbb447de593278b228635e6f1cea6f2c).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #79909 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79909/testReport)** for PR 18143 at commit [`1c122ca`](https://github.com/apache/spark/commit/1c122caff1a485fb766f36b71280b3d8ac34595a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Generally looks ok to me, thanks.
    2 questions -
    
    Did you do any testing on workloads to see if performance stayed the same?
    
    Is there a reason not to do the same thing to streaming/kafka010/CachedKafkaConsumer.scala for consistency?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    **[Test build #77777 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77777/testReport)** for PR 18143 at commit [`288ab39`](https://github.com/apache/spark/commit/288ab39923ee93d08f7b98313c88701e862359cd).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaConsumer ...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/18143
  
    Hi, @brkyvz as we discussed on the PR for CachedKafkaProducer that Guava cache can be used for consumer as well. May be you could take a look at this as well ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org