You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ScrapCodes <gi...@git.apache.org> on 2017/03/16 05:56:56 UTC

[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

GitHub user ScrapCodes opened a pull request:

    https://github.com/apache/spark/pull/17308

    [SPARK-19968][SS] Use a cached instance of `KafkaProducer` instead of creating one every batch.

    ## What changes were proposed in this pull request?
    Changes include a new API for doing cleanup of resources in KafkaSink is added to Sink trait.
    
    In summary, cost of recreating a KafkaProducer for writing every batch is high as it starts a lot threads and make connections and then closes them. A KafkaProducer instance is promised to be thread safe in Kafka docs. Reuse of KafkaProducer instance while writing via multiple threads is encouraged.
    
    Furthermore, I have performance improvement of 10x in latency, with this patch.
    
    TODO: post exact results.
    
    ## How was this patch tested?
    Running distributed benchmarks comparing runs with this patch and without it.
    Added relevant unit tests.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ScrapCodes/spark cached-kafka-producer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17308
    
----
commit febf3874cf07bad04e574b571f1caa839c9c28b7
Author: Prashant Sharma <pr...@in.ibm.com>
Date:   2017-03-15T11:03:45Z

    [SPARK-19968][SS] Use a cached instance of KafkaProducer instead of creating one every batch.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117604954
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.immutable.SortedMap
    +import scala.collection.mutable
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  @GuardedBy("this")
    +  private val cacheMap = new mutable.HashMap[String, Producer]()
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.Map[String, Object]): Producer = {
    +    val uid = producerConfiguration.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId)
    --- End diff --
    
    Shall we always use `getUniqueId` to get uid?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77233 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77233/testReport)** for PR 17308 at commit [`15dfc80`](https://github.com/apache/spark/commit/15dfc80a8a35208f5f9df150de7c4bd9a015e2d8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    @brkyvz I am toying with Guava cache, the only problem is, it does not have an in-built mechanism to do any cleanup on shutdown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118083984
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.SortedMap
    +import scala.collection.mutable
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.util.ShutdownHookManager
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  private val cacheExpireTimeout: Long =
    +    System.getProperty("spark.kafka.guava.cache.timeout", "10").toLong
    --- End diff --
    
    probably useful to mention this is in minutes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r110625538
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,70 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +
    +import scala.collection.mutable
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private val cacheMap = new mutable.HashMap[Int, KafkaProducer[Array[Byte], Array[Byte]]]()
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.HashMap[String, Object]): KafkaProducer[Array[Byte], Array[Byte]] = {
    +    val kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]] =
    +      new KafkaProducer[Array[Byte], Array[Byte]](producerConfiguration)
    +    cacheMap.put(producerConfiguration.hashCode(), kafkaProducer)
    --- End diff --
    
    It is not a good idea to do like that. 
    
    I had like my understanding to be corrected, as much as I understood. Since in this particular case Spark does not let user specify a key or value serializer/deserializer. So `Object` can be either a String, int or Long and for these hashcode would work correctly. I am also contemplating a better way to do it, now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117604874
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -683,6 +685,17 @@ class StreamExecution(
         }
       }
     
    +  /** Stops streaming sink safely. */
    +  private def stopSink(): Unit = {
    +      try {
    --- End diff --
    
    nit: the ident is not the typical one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77183 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77183/testReport)** for PR 17308 at commit [`e5cd1e6`](https://github.com/apache/spark/commit/e5cd1e607748163cec4a2131ae23224e60317a9d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Just to throw in my two cents, a change like this is definitely needed, as is made clear by the second sentence of the docs
    
    http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
    
    "The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77357/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117077248
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.immutable.SortedMap
    +import scala.collection.mutable
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  @GuardedBy("this")
    +  private val cacheMap = new mutable.HashMap[String, Producer]()
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.Map[String, Object]): Producer = {
    +    val uid = producerConfiguration.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId)
    +    val kafkaProducer: Producer = new Producer(producerConfiguration)
    +    cacheMap.put(uid.toString, kafkaProducer)
    +    log.debug(s"Created a new instance of KafkaProducer for $producerConfiguration.")
    +    kafkaProducer
    +  }
    +
    +  private def getUniqueId(kafkaParams: ju.Map[String, Object]): String = {
    +    val uid = kafkaParams.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId)
    +    assert(uid != null, s"KafkaParams($kafkaParams) not canonicalized")
    +    uid.toString
    +  }
    +
    +  /**
    +   * Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't
    +   * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep
    +   * one instance per specified kafkaParams.
    +   */
    +  private[kafka010] def getOrCreate(
    +    kafkaParams: ju.Map[String, Object]): Producer = synchronized {
    +    val params = if (!CanonicalizeKafkaParams.isCanonicalized(kafkaParams)) {
    +      CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    +    } else kafkaParams
    +    val uid = getUniqueId(params)
    +    cacheMap.getOrElse(uid.toString, createKafkaProducer(params))
    +  }
    +
    +  private[kafka010] def close(kafkaParams: ju.Map[String, Object]): Unit = synchronized {
    +    val params = if (!CanonicalizeKafkaParams.isCanonicalized(kafkaParams)) {
    +      CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    +    } else kafkaParams
    +    val uid = getUniqueId(params)
    +
    +    val producer: Option[Producer] = cacheMap.remove(uid)
    +    if (producer.isDefined) {
    +      log.info(s"Closing the KafkaProducer with config: $kafkaParams")
    +      CanonicalizeKafkaParams.remove(kafkaParams)
    +      producer.foreach(_.close())
    +    } else {
    +      log.warn(s"No KafkaProducer found in cache for $kafkaParams.")
    +    }
    +  }
    +
    +  // Intended for testing purpose only.
    +  private def clear(): Unit = {
    +    cacheMap.foreach(x => x._2.close())
    +    cacheMap.clear()
    +  }
    +}
    +
    +/**
    + * Canonicalize kafka params i.e. append a unique internal id to kafka params, if it already does
    + * not exist. This is done to ensure, we have only one set of kafka parameters associated with a
    + * unique ID.
    + */
    +private[kafka010] object CanonicalizeKafkaParams extends Logging {
    +
    +  import scala.collection.JavaConverters._
    +
    +  @GuardedBy("this")
    +  private val registryMap = mutable.HashMap[String, String]()
    +
    +  private[kafka010] val sparkKafkaParamsUniqueId: String =
    +    "spark.internal.sql.kafka.params.uniqueId"
    +
    +  private def generateRandomUUID(kafkaParams: String): String = {
    +    val uuid = ju.UUID.randomUUID().toString
    +    logDebug(s"Generating a new unique id:$uuid for kafka params: $kafkaParams")
    +    registryMap.put(kafkaParams, uuid)
    +    uuid
    +  }
    +
    +  private[kafka010] def isCanonicalized(kafkaParams: ju.Map[String, Object]): Boolean = {
    +    if (kafkaParams.get(sparkKafkaParamsUniqueId) != null) {
    --- End diff --
    
    nit: you don't need the `if-else`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r116958096
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala ---
    @@ -94,4 +94,10 @@ private[kafka010] object KafkaWriter extends Logging {
           }
         }
       }
    +
    +  def close(sc: SparkContext, kafkaParams: ju.Map[String, Object]): Unit = {
    +    sc.parallelize(1 to 10000).foreachPartition { iter =>
    +      CachedKafkaProducer.close(kafkaParams)
    +    }
    --- End diff --
    
    This would cause  `CachedKafkaProducer.close` to be executed on each executor. I am thinking of a better way here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Build is failing due to "Our attempt to download sbt locally to build/sbt-launch-0.13.13.jar failed. Please install sbt manually from http://www.scala-sbt.org/"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77358 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77358/testReport)** for PR 17308 at commit [`588fa03`](https://github.com/apache/spark/commit/588fa03d299328405121534b114716cd00bc3a48).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    @tcondie and @zsxwing any comments on this patch. I would be happy, if this bug is fixed before 2.2 is released.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77183/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r106344532
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ---
    @@ -32,7 +31,7 @@ import org.apache.spark.sql.types.{BinaryType, StringType}
      * automatically trigger task aborts.
      */
     private[kafka010] class KafkaWriteTask(
    -    producerConfiguration: ju.Map[String, Object],
    +    producerConfiguration: ju.HashMap[String, Object],
    --- End diff --
    
    Ideally this should not have been changed. But, they are changed to HashMap, to avoid converting or casting them later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    @viirya Thank you for taking a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77293 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77293/testReport)** for PR 17308 at commit [`039d063`](https://github.com/apache/spark/commit/039d063af502586109afb0ecd135390c4b7d2050).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118388741
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.SortedMap
    +import scala.collection.mutable
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.util.ShutdownHookManager
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  private lazy val cacheExpireTimeout: Long =
    +    SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m")
    +
    +  private val removalListener = new RemovalListener[String, Producer]() {
    +    override def onRemoval(notification: RemovalNotification[String, Producer]): Unit = {
    +      val uid: String = notification.getKey
    +      val producer: Producer = notification.getValue
    +      logDebug(s"Evicting kafka producer $producer uid: $uid, due to ${notification.getCause}")
    +      close(uid, producer)
    +    }
    +  }
    +
    +  private lazy val guavaCache: Cache[String, Producer] = CacheBuilder.newBuilder()
    +    .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
    +    .removalListener(removalListener)
    +    .build[String, Producer]()
    +
    +  ShutdownHookManager.addShutdownHook { () =>
    +    clear()
    --- End diff --
    
    I think it will close connections as well. That's really not necessary since the process is being shut down.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r114841111
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,70 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +
    +import scala.collection.mutable
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private val cacheMap = new mutable.HashMap[Int, KafkaProducer[Array[Byte], Array[Byte]]]()
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.HashMap[String, Object]): KafkaProducer[Array[Byte], Array[Byte]] = {
    +    val kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]] =
    +      new KafkaProducer[Array[Byte], Array[Byte]](producerConfiguration)
    +    cacheMap.put(producerConfiguration.hashCode(), kafkaProducer)
    --- End diff --
    
    Yeah, I don't think this is a good key for the hashmap.  There could be collisions.  We should either assign a unique ID to the sink and thread that through, or come up with some way to canoncicalize the set of parameters that create the sink.  The latter might better since you could maybe reuse the same producer for more than one query.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77297 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77297/testReport)** for PR 17308 at commit [`1c9f892`](https://github.com/apache/spark/commit/1c9f89226ef37d338032fbf8b4b8a1c527cc5efd).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77357 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77357/testReport)** for PR 17308 at commit [`588fa03`](https://github.com/apache/spark/commit/588fa03d299328405121534b114716cd00bc3a48).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Jenkins, retest this please !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77499 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77499/testReport)** for PR 17308 at commit [`a10276a`](https://github.com/apache/spark/commit/a10276abc783ad3818821c31f38ce56acc3e9d77).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #74658 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74658/testReport)** for PR 17308 at commit [`0b5e2e8`](https://github.com/apache/spark/commit/0b5e2e82d8f190bbf115234511f9c8e24f8e1995).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class CachedKafkaProducerSuite extends SharedSQLContext with PrivateMethodTester `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #74658 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74658/testReport)** for PR 17308 at commit [`0b5e2e8`](https://github.com/apache/spark/commit/0b5e2e82d8f190bbf115234511f9c8e24f8e1995).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r114841510
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,70 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +
    +import scala.collection.mutable
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private val cacheMap = new mutable.HashMap[Int, KafkaProducer[Array[Byte], Array[Byte]]]()
    --- End diff --
    
    uber nit: maybe we can define a type alias
    ```scala
    type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    ```
    so that we don't have to write that whole thing over and over


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118338475
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.SortedMap
    +import scala.collection.mutable
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.util.ShutdownHookManager
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  private lazy val cacheExpireTimeout: Long =
    +    SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m")
    +
    +  private val removalListener = new RemovalListener[String, Producer]() {
    +    override def onRemoval(notification: RemovalNotification[String, Producer]): Unit = {
    +      val uid: String = notification.getKey
    +      val producer: Producer = notification.getValue
    +      logDebug(s"Evicting kafka producer $producer uid: $uid, due to ${notification.getCause}")
    +      close(uid, producer)
    +    }
    +  }
    +
    +  private lazy val guavaCache: Cache[String, Producer] = CacheBuilder.newBuilder()
    +    .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
    +    .removalListener(removalListener)
    +    .build[String, Producer]()
    +
    +  ShutdownHookManager.addShutdownHook { () =>
    +    clear()
    --- End diff --
    
    Do we really need to stop producers in a shutdown hook? I'm asking because stopping a producer is a blocking call and may prevent other shutdown hooks to run.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/17308


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117076256
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.immutable.SortedMap
    +import scala.collection.mutable
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  @GuardedBy("this")
    +  private val cacheMap = new mutable.HashMap[String, Producer]()
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.Map[String, Object]): Producer = {
    +    val uid = producerConfiguration.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId)
    +    val kafkaProducer: Producer = new Producer(producerConfiguration)
    +    cacheMap.put(uid.toString, kafkaProducer)
    +    log.debug(s"Created a new instance of KafkaProducer for $producerConfiguration.")
    +    kafkaProducer
    +  }
    +
    +  private def getUniqueId(kafkaParams: ju.Map[String, Object]): String = {
    +    val uid = kafkaParams.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId)
    +    assert(uid != null, s"KafkaParams($kafkaParams) not canonicalized")
    +    uid.toString
    +  }
    +
    +  /**
    +   * Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't
    +   * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep
    +   * one instance per specified kafkaParams.
    +   */
    +  private[kafka010] def getOrCreate(
    +    kafkaParams: ju.Map[String, Object]): Producer = synchronized {
    --- End diff --
    
    ditto


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77227 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77227/testReport)** for PR 17308 at commit [`15dfc80`](https://github.com/apache/spark/commit/15dfc80a8a35208f5f9df150de7c4bd9a015e2d8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74658/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #74646 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74646/testReport)** for PR 17308 at commit [`830d76d`](https://github.com/apache/spark/commit/830d76d3ce19aa7fdcafed2a3c67338232d9dbeb).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118194379
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CanonicalizeKafkaParamsSuite.scala ---
    @@ -0,0 +1,61 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +
    +import org.apache.kafka.common.serialization.ByteArraySerializer
    +import org.scalatest.PrivateMethodTester
    --- End diff --
    
    Ahh, oversight. Thanks !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #76872 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76872/testReport)** for PR 17308 at commit [`ea9592a`](https://github.com/apache/spark/commit/ea9592adacfa7b923d1df93d3b40f0ec6a3bc548).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118798407
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ---
    @@ -68,11 +67,10 @@ private[kafka010] class KafkaWriteTask(
       }
     
       def close(): Unit = {
    +    checkForErrors()
         if (producer != null) {
    -      checkForErrors
    -      producer.close()
    -      checkForErrors
    -      producer = null
    --- End diff --
    
    nit: please keep `producer = null` for double-close


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Taking a look


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77227 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77227/testReport)** for PR 17308 at commit [`15dfc80`](https://github.com/apache/spark/commit/15dfc80a8a35208f5f9df150de7c4bd9a015e2d8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118084337
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala ---
    @@ -19,9 +19,9 @@ package org.apache.spark.sql.kafka010
     
     import java.{util => ju}
     
    +import org.apache.spark.SparkContext
    --- End diff --
    
    unnecessary import?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    @tdas ping !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117077673
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.immutable.SortedMap
    +import scala.collection.mutable
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  @GuardedBy("this")
    +  private val cacheMap = new mutable.HashMap[String, Producer]()
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.Map[String, Object]): Producer = {
    +    val uid = producerConfiguration.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId)
    +    val kafkaProducer: Producer = new Producer(producerConfiguration)
    +    cacheMap.put(uid.toString, kafkaProducer)
    +    log.debug(s"Created a new instance of KafkaProducer for $producerConfiguration.")
    +    kafkaProducer
    +  }
    +
    +  private def getUniqueId(kafkaParams: ju.Map[String, Object]): String = {
    +    val uid = kafkaParams.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId)
    +    assert(uid != null, s"KafkaParams($kafkaParams) not canonicalized")
    +    uid.toString
    +  }
    +
    +  /**
    +   * Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't
    +   * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep
    +   * one instance per specified kafkaParams.
    +   */
    +  private[kafka010] def getOrCreate(
    +    kafkaParams: ju.Map[String, Object]): Producer = synchronized {
    +    val params = if (!CanonicalizeKafkaParams.isCanonicalized(kafkaParams)) {
    +      CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    +    } else kafkaParams
    +    val uid = getUniqueId(params)
    +    cacheMap.getOrElse(uid.toString, createKafkaProducer(params))
    +  }
    +
    +  private[kafka010] def close(kafkaParams: ju.Map[String, Object]): Unit = synchronized {
    +    val params = if (!CanonicalizeKafkaParams.isCanonicalized(kafkaParams)) {
    +      CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    +    } else kafkaParams
    +    val uid = getUniqueId(params)
    +
    +    val producer: Option[Producer] = cacheMap.remove(uid)
    +    if (producer.isDefined) {
    +      log.info(s"Closing the KafkaProducer with config: $kafkaParams")
    +      CanonicalizeKafkaParams.remove(kafkaParams)
    +      producer.foreach(_.close())
    +    } else {
    +      log.warn(s"No KafkaProducer found in cache for $kafkaParams.")
    +    }
    +  }
    +
    +  // Intended for testing purpose only.
    +  private def clear(): Unit = {
    +    cacheMap.foreach(x => x._2.close())
    +    cacheMap.clear()
    +  }
    +}
    +
    +/**
    + * Canonicalize kafka params i.e. append a unique internal id to kafka params, if it already does
    + * not exist. This is done to ensure, we have only one set of kafka parameters associated with a
    + * unique ID.
    + */
    +private[kafka010] object CanonicalizeKafkaParams extends Logging {
    +
    +  import scala.collection.JavaConverters._
    +
    +  @GuardedBy("this")
    +  private val registryMap = mutable.HashMap[String, String]()
    +
    +  private[kafka010] val sparkKafkaParamsUniqueId: String =
    +    "spark.internal.sql.kafka.params.uniqueId"
    +
    +  private def generateRandomUUID(kafkaParams: String): String = {
    +    val uuid = ju.UUID.randomUUID().toString
    +    logDebug(s"Generating a new unique id:$uuid for kafka params: $kafkaParams")
    +    registryMap.put(kafkaParams, uuid)
    +    uuid
    +  }
    +
    +  private[kafka010] def isCanonicalized(kafkaParams: ju.Map[String, Object]): Boolean = {
    +    if (kafkaParams.get(sparkKafkaParamsUniqueId) != null) {
    +      true
    +    } else {
    +      false
    +    }
    +  }
    +
    +  private[kafka010] def computeUniqueCanonicalForm(
    +    kafkaParams: ju.Map[String, Object]): ju.Map[String, Object] = synchronized {
    +    if (isCanonicalized(kafkaParams)) {
    +      logWarning(s"A unique id,$sparkKafkaParamsUniqueId ->" +
    +        s" ${kafkaParams.get(sparkKafkaParamsUniqueId)}" +
    +        s" already exists in kafka params, returning Kafka Params:$kafkaParams as is.")
    +     kafkaParams
    +    } else {
    +      val sortedMap = SortedMap.empty[String, Object] ++ kafkaParams.asScala
    +      val stringRepresentation: String = sortedMap.mkString("\n")
    +      val uuid =
    +        registryMap.getOrElse(stringRepresentation, generateRandomUUID(stringRepresentation))
    +      val newMap = new ju.HashMap[String, Object](kafkaParams)
    +      newMap.put(sparkKafkaParamsUniqueId, uuid)
    +      newMap
    +    }
    +  }
    +
    +  private[kafka010] def remove(kafkaParams: ju.Map[String, Object]): Boolean = {
    +    val sortedMap = SortedMap.empty[String, Object] ++ kafkaParams.asScala
    +    val stringRepresentation: String = sortedMap.mkString("\n")
    +    registryMap.remove(stringRepresentation).isDefined
    +  }
    +
    +  // For testing purpose only.
    +  private[kafka010] def clear(): Unit = {
    +    registryMap.clear()
    +  }
    +
    --- End diff --
    
    nit: no need for newline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118175283
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.SortedMap
    +import scala.collection.mutable
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.util.ShutdownHookManager
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  private val cacheExpireTimeout: Long =
    +    System.getProperty("spark.kafka.guava.cache.timeout.minutes", "10").toLong
    +
    +  private val removalListener = new RemovalListener[String, Producer]() {
    +    override def onRemoval(notification: RemovalNotification[String, Producer]): Unit = {
    +      val uid: String = notification.getKey
    +      val producer: Producer = notification.getValue
    +      logDebug(s"Evicting kafka producer $producer uid: $uid, due to ${notification.getCause}")
    +      close(uid, producer)
    +    }
    +  }
    +
    +  private val guavaCache: Cache[String, Producer] = CacheBuilder.newBuilder()
    +    .recordStats()
    --- End diff --
    
    Do we use the stats?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    @marmbrus Thank you for taking a look again. Surely, shut down hook is not ideal for closing kafka producers. In fact, for the case of kafka sink, it might be correct to skip cleanup step. I have tried to address your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77410/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r114841245
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala ---
    @@ -30,14 +30,19 @@ private[kafka010] class KafkaSink(
       @volatile private var latestBatchId = -1L
     
       override def toString(): String = "KafkaSink"
    +  private val kafkaParams = new ju.HashMap[String, Object](executorKafkaParams)
     
       override def addBatch(batchId: Long, data: DataFrame): Unit = {
         if (batchId <= latestBatchId) {
           logInfo(s"Skipping already committed batch $batchId")
         } else {
           KafkaWriter.write(sqlContext.sparkSession,
    -        data.queryExecution, executorKafkaParams, topic)
    +        data.queryExecution, kafkaParams, topic)
           latestBatchId = batchId
         }
       }
    +
    +  override def stop(): Unit = {
    +    CachedKafkaProducer.close(kafkaParams)
    --- End diff --
    
    This is only closing the producer on the driver, right?  Do we even create on there?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r114841425
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,70 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +
    +import scala.collection.mutable
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private val cacheMap = new mutable.HashMap[Int, KafkaProducer[Array[Byte], Array[Byte]]]()
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.HashMap[String, Object]): KafkaProducer[Array[Byte], Array[Byte]] = {
    +    val kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]] =
    +      new KafkaProducer[Array[Byte], Array[Byte]](producerConfiguration)
    +    cacheMap.put(producerConfiguration.hashCode(), kafkaProducer)
    +    kafkaProducer
    +  }
    +
    +  /**
    +   * Get a cached KafkaProducer for a given configuration. A new KafkaProducer will be created,
    +   * if matching KafkaProducer doesn't exist. KafkaProducer is thread safe, it is best to keep
    +   * one instance per specified kafkaParams.
    +   */
    +  def getOrCreate(kafkaParams: ju.HashMap[String, Object])
    +  : KafkaProducer[Array[Byte], Array[Byte]] = synchronized {
    --- End diff --
    
    nit: Typically we wrap the arguments rather than the return type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    @ScrapCodes It should have a `clear` method for testing. We won't be able to guarantee shutdown being called on the executors when the `stop` method for the Sink is called anyway, that's why I suggested the timeout solution. If you really want, I think you can also use the `ShutdownHookManager` and call clean on the cache as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #76890 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76890/testReport)** for PR 17308 at commit [`8224596`](https://github.com/apache/spark/commit/82245967eb618e3bdd865c200df7aaca2e41a7a6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    LGTM. Merging to master and 2.2. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117805799
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -18,34 +18,58 @@
     package org.apache.spark.sql.kafka010
     
     import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
     import javax.annotation.concurrent.GuardedBy
     
    +import com.google.common.cache._
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
     import scala.collection.immutable.SortedMap
     import scala.collection.mutable
    -
    -import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.util.control.NonFatal
     
     import org.apache.spark.internal.Logging
     
     private[kafka010] object CachedKafkaProducer extends Logging {
     
       private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
     
    -  @GuardedBy("this")
    -  private val cacheMap = new mutable.HashMap[String, Producer]()
    +  private val cacheExpireTimeout: Long = System.getProperty("spark.kafka.guava.cache.timeout",
    --- End diff --
    
    Scratch that. We will only have a single cache... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117077390
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.immutable.SortedMap
    +import scala.collection.mutable
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  @GuardedBy("this")
    +  private val cacheMap = new mutable.HashMap[String, Producer]()
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.Map[String, Object]): Producer = {
    +    val uid = producerConfiguration.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId)
    +    val kafkaProducer: Producer = new Producer(producerConfiguration)
    +    cacheMap.put(uid.toString, kafkaProducer)
    +    log.debug(s"Created a new instance of KafkaProducer for $producerConfiguration.")
    +    kafkaProducer
    +  }
    +
    +  private def getUniqueId(kafkaParams: ju.Map[String, Object]): String = {
    +    val uid = kafkaParams.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId)
    +    assert(uid != null, s"KafkaParams($kafkaParams) not canonicalized")
    +    uid.toString
    +  }
    +
    +  /**
    +   * Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't
    +   * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep
    +   * one instance per specified kafkaParams.
    +   */
    +  private[kafka010] def getOrCreate(
    +    kafkaParams: ju.Map[String, Object]): Producer = synchronized {
    +    val params = if (!CanonicalizeKafkaParams.isCanonicalized(kafkaParams)) {
    +      CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    +    } else kafkaParams
    +    val uid = getUniqueId(params)
    +    cacheMap.getOrElse(uid.toString, createKafkaProducer(params))
    +  }
    +
    +  private[kafka010] def close(kafkaParams: ju.Map[String, Object]): Unit = synchronized {
    +    val params = if (!CanonicalizeKafkaParams.isCanonicalized(kafkaParams)) {
    +      CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    +    } else kafkaParams
    +    val uid = getUniqueId(params)
    +
    +    val producer: Option[Producer] = cacheMap.remove(uid)
    +    if (producer.isDefined) {
    +      log.info(s"Closing the KafkaProducer with config: $kafkaParams")
    +      CanonicalizeKafkaParams.remove(kafkaParams)
    +      producer.foreach(_.close())
    +    } else {
    +      log.warn(s"No KafkaProducer found in cache for $kafkaParams.")
    +    }
    +  }
    +
    +  // Intended for testing purpose only.
    +  private def clear(): Unit = {
    +    cacheMap.foreach(x => x._2.close())
    +    cacheMap.clear()
    +  }
    +}
    +
    +/**
    + * Canonicalize kafka params i.e. append a unique internal id to kafka params, if it already does
    + * not exist. This is done to ensure, we have only one set of kafka parameters associated with a
    + * unique ID.
    + */
    +private[kafka010] object CanonicalizeKafkaParams extends Logging {
    +
    +  import scala.collection.JavaConverters._
    +
    +  @GuardedBy("this")
    +  private val registryMap = mutable.HashMap[String, String]()
    +
    +  private[kafka010] val sparkKafkaParamsUniqueId: String =
    +    "spark.internal.sql.kafka.params.uniqueId"
    +
    +  private def generateRandomUUID(kafkaParams: String): String = {
    +    val uuid = ju.UUID.randomUUID().toString
    +    logDebug(s"Generating a new unique id:$uuid for kafka params: $kafkaParams")
    +    registryMap.put(kafkaParams, uuid)
    +    uuid
    +  }
    +
    +  private[kafka010] def isCanonicalized(kafkaParams: ju.Map[String, Object]): Boolean = {
    +    if (kafkaParams.get(sparkKafkaParamsUniqueId) != null) {
    +      true
    +    } else {
    +      false
    +    }
    +  }
    +
    +  private[kafka010] def computeUniqueCanonicalForm(
    +    kafkaParams: ju.Map[String, Object]): ju.Map[String, Object] = synchronized {
    +    if (isCanonicalized(kafkaParams)) {
    +      logWarning(s"A unique id,$sparkKafkaParamsUniqueId ->" +
    --- End diff --
    
    nit: space after `,`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117077740
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala ---
    @@ -70,13 +70,13 @@ import org.apache.spark.unsafe.types.UTF8String
      * and not use wrong broker addresses.
      */
     private[kafka010] class KafkaSource(
    -                                     sqlContext: SQLContext,
    -                                     kafkaReader: KafkaOffsetReader,
    -                                     executorKafkaParams: ju.Map[String, Object],
    -                                     sourceOptions: Map[String, String],
    -                                     metadataPath: String,
    -                                     startingOffsets: KafkaOffsetRangeLimit,
    -                                     failOnDataLoss: Boolean)
    +  sqlContext: SQLContext,
    --- End diff --
    
    4 spaces please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118387956
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.SortedMap
    +import scala.collection.mutable
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.util.ShutdownHookManager
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  private lazy val cacheExpireTimeout: Long =
    +    SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m")
    +
    +  private val removalListener = new RemovalListener[String, Producer]() {
    +    override def onRemoval(notification: RemovalNotification[String, Producer]): Unit = {
    +      val uid: String = notification.getKey
    +      val producer: Producer = notification.getValue
    +      logDebug(s"Evicting kafka producer $producer uid: $uid, due to ${notification.getCause}")
    +      close(uid, producer)
    +    }
    +  }
    +
    +  private lazy val guavaCache: Cache[String, Producer] = CacheBuilder.newBuilder()
    +    .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
    +    .removalListener(removalListener)
    +    .build[String, Producer]()
    +
    +  ShutdownHookManager.addShutdownHook { () =>
    +    clear()
    --- End diff --
    
    +1, this seems complicated.  What exactly does shutdown do?  Is it just cleaning up thread pools?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118389082
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.SortedMap
    +import scala.collection.mutable
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.util.ShutdownHookManager
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  private lazy val cacheExpireTimeout: Long =
    +    SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m")
    +
    +  private val removalListener = new RemovalListener[String, Producer]() {
    +    override def onRemoval(notification: RemovalNotification[String, Producer]): Unit = {
    +      val uid: String = notification.getKey
    +      val producer: Producer = notification.getValue
    +      logDebug(s"Evicting kafka producer $producer uid: $uid, due to ${notification.getCause}")
    +      close(uid, producer)
    +    }
    +  }
    +
    +  private lazy val guavaCache: Cache[String, Producer] = CacheBuilder.newBuilder()
    +    .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
    +    .removalListener(removalListener)
    +    .build[String, Producer]()
    +
    +  ShutdownHookManager.addShutdownHook { () =>
    +    clear()
    +  }
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.Map[String, Object]): Producer = {
    --- End diff --
    
    nit: indent 4 here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117605188
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CanonicalizeKafkaParamsSuite.scala ---
    @@ -0,0 +1,61 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +
    +import org.apache.kafka.common.serialization.ByteArraySerializer
    +import org.scalatest.PrivateMethodTester
    +
    +import org.apache.spark.sql.test.SharedSQLContext
    +
    +class CanonicalizeKafkaParamsSuite extends SharedSQLContext with PrivateMethodTester {
    +
    +  test("Same unique id is returned for same set of kafka parameters") {
    +    CanonicalizeKafkaParams.clear()
    +    val kafkaParams = new ju.HashMap[String, Object]()
    +    kafkaParams.put("acks", "0")
    +    // Here only host should be resolvable, it does not need a running instance of kafka server.
    +    kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
    +    kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
    +    kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
    +    val kp2 = CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    +    val kafkaParams2 = new ju.HashMap[String, Object](kafkaParams)
    +    val kp3 = CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams2)
    +    val uid1 = kp2.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId).toString
    +    val uid2 = kp3.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId).toString
    +    assert(uid1 == uid2)
    +  }
    +
    +  test("New unique id is generated for any modification in kafka parameters.") {
    +    CanonicalizeKafkaParams.clear()
    +    val kafkaParams = new ju.HashMap[String, Object]()
    +    kafkaParams.put("acks", "0")
    +    // Here only host should be resolvable, it does not need a running instance of kafka server.
    +    kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
    +    kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
    +    kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
    +    val kp2 = CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    --- End diff --
    
    CanonicalKP?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118178655
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CanonicalizeKafkaParamsSuite.scala ---
    @@ -0,0 +1,61 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +
    +import org.apache.kafka.common.serialization.ByteArraySerializer
    +import org.scalatest.PrivateMethodTester
    --- End diff --
    
    Do we use this import?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74646/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r116158469
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala ---
    @@ -30,14 +30,19 @@ private[kafka010] class KafkaSink(
       @volatile private var latestBatchId = -1L
     
       override def toString(): String = "KafkaSink"
    +  private val kafkaParams = new ju.HashMap[String, Object](executorKafkaParams)
     
       override def addBatch(batchId: Long, data: DataFrame): Unit = {
         if (batchId <= latestBatchId) {
           logInfo(s"Skipping already committed batch $batchId")
         } else {
           KafkaWriter.write(sqlContext.sparkSession,
    -        data.queryExecution, executorKafkaParams, topic)
    +        data.queryExecution, kafkaParams, topic)
           latestBatchId = batchId
         }
       }
    +
    +  override def stop(): Unit = {
    +    CachedKafkaProducer.close(kafkaParams)
    --- End diff --
    
    That's correct, I have understood, close requires a bit of rethinking, I am unable to see a straight forward way of doing it. If you agree, close related implementation can be taken out from this PR and be taken up in a separate JIRA and PR ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    @marmbrus @zsxwing @tdas This needs attention from someone


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77021 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77021/testReport)** for PR 17308 at commit [`3ec9981`](https://github.com/apache/spark/commit/3ec9981d4148b58df1f0e7c029762d5a6d3dd446).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by BenFradet <gi...@git.apache.org>.
Github user BenFradet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r109419942
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,70 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +
    +import scala.collection.mutable
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private val cacheMap = new mutable.HashMap[Int, KafkaProducer[Array[Byte], Array[Byte]]]()
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.HashMap[String, Object]): KafkaProducer[Array[Byte], Array[Byte]] = {
    +    val kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]] =
    +      new KafkaProducer[Array[Byte], Array[Byte]](producerConfiguration)
    +    cacheMap.put(producerConfiguration.hashCode(), kafkaProducer)
    --- End diff --
    
    Just curious if this a good idea to key the producer map by the hash code of a map for which the values are `Object`s?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77358 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77358/testReport)** for PR 17308 at commit [`588fa03`](https://github.com/apache/spark/commit/588fa03d299328405121534b114716cd00bc3a48).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #76890 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76890/testReport)** for PR 17308 at commit [`8224596`](https://github.com/apache/spark/commit/82245967eb618e3bdd865c200df7aaca2e41a7a6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77410 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77410/testReport)** for PR 17308 at commit [`588fa03`](https://github.com/apache/spark/commit/588fa03d299328405121534b114716cd00bc3a48).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #76869 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76869/testReport)** for PR 17308 at commit [`2a15afe`](https://github.com/apache/spark/commit/2a15afe326ba7660ff991811fa200c796113b146).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77297/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #74646 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74646/testReport)** for PR 17308 at commit [`830d76d`](https://github.com/apache/spark/commit/830d76d3ce19aa7fdcafed2a3c67338232d9dbeb).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77225/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Jenkins, retest this please !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Hi @marmbrus and @brkyvz, Thanks a lot of taking a look.
    
    @marmbrus You are right, we should have another way to canonicalize kafka params. I can only think of appending a unique id to kafka params and somehow ensuring a particular set of params get the same uid everytime.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #76868 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76868/testReport)** for PR 17308 at commit [`932d563`](https://github.com/apache/spark/commit/932d5638feed4fd5c602da27129b5bc4b81ff897).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77183 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77183/testReport)** for PR 17308 at commit [`e5cd1e6`](https://github.com/apache/spark/commit/e5cd1e607748163cec4a2131ae23224e60317a9d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77227/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77233 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77233/testReport)** for PR 17308 at commit [`15dfc80`](https://github.com/apache/spark/commit/15dfc80a8a35208f5f9df150de7c4bd9a015e2d8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76869/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117077304
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.immutable.SortedMap
    +import scala.collection.mutable
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  @GuardedBy("this")
    +  private val cacheMap = new mutable.HashMap[String, Producer]()
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.Map[String, Object]): Producer = {
    +    val uid = producerConfiguration.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId)
    +    val kafkaProducer: Producer = new Producer(producerConfiguration)
    +    cacheMap.put(uid.toString, kafkaProducer)
    +    log.debug(s"Created a new instance of KafkaProducer for $producerConfiguration.")
    +    kafkaProducer
    +  }
    +
    +  private def getUniqueId(kafkaParams: ju.Map[String, Object]): String = {
    +    val uid = kafkaParams.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId)
    +    assert(uid != null, s"KafkaParams($kafkaParams) not canonicalized")
    +    uid.toString
    +  }
    +
    +  /**
    +   * Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't
    +   * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep
    +   * one instance per specified kafkaParams.
    +   */
    +  private[kafka010] def getOrCreate(
    +    kafkaParams: ju.Map[String, Object]): Producer = synchronized {
    +    val params = if (!CanonicalizeKafkaParams.isCanonicalized(kafkaParams)) {
    +      CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    +    } else kafkaParams
    +    val uid = getUniqueId(params)
    +    cacheMap.getOrElse(uid.toString, createKafkaProducer(params))
    +  }
    +
    +  private[kafka010] def close(kafkaParams: ju.Map[String, Object]): Unit = synchronized {
    +    val params = if (!CanonicalizeKafkaParams.isCanonicalized(kafkaParams)) {
    +      CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    +    } else kafkaParams
    +    val uid = getUniqueId(params)
    +
    +    val producer: Option[Producer] = cacheMap.remove(uid)
    +    if (producer.isDefined) {
    +      log.info(s"Closing the KafkaProducer with config: $kafkaParams")
    +      CanonicalizeKafkaParams.remove(kafkaParams)
    +      producer.foreach(_.close())
    +    } else {
    +      log.warn(s"No KafkaProducer found in cache for $kafkaParams.")
    +    }
    +  }
    +
    +  // Intended for testing purpose only.
    +  private def clear(): Unit = {
    +    cacheMap.foreach(x => x._2.close())
    +    cacheMap.clear()
    +  }
    +}
    +
    +/**
    + * Canonicalize kafka params i.e. append a unique internal id to kafka params, if it already does
    + * not exist. This is done to ensure, we have only one set of kafka parameters associated with a
    + * unique ID.
    + */
    +private[kafka010] object CanonicalizeKafkaParams extends Logging {
    +
    +  import scala.collection.JavaConverters._
    +
    +  @GuardedBy("this")
    +  private val registryMap = mutable.HashMap[String, String]()
    +
    +  private[kafka010] val sparkKafkaParamsUniqueId: String =
    +    "spark.internal.sql.kafka.params.uniqueId"
    +
    +  private def generateRandomUUID(kafkaParams: String): String = {
    +    val uuid = ju.UUID.randomUUID().toString
    +    logDebug(s"Generating a new unique id:$uuid for kafka params: $kafkaParams")
    +    registryMap.put(kafkaParams, uuid)
    +    uuid
    +  }
    +
    +  private[kafka010] def isCanonicalized(kafkaParams: ju.Map[String, Object]): Boolean = {
    +    if (kafkaParams.get(sparkKafkaParamsUniqueId) != null) {
    +      true
    +    } else {
    +      false
    +    }
    +  }
    +
    +  private[kafka010] def computeUniqueCanonicalForm(
    +    kafkaParams: ju.Map[String, Object]): ju.Map[String, Object] = synchronized {
    --- End diff --
    
    ditto on indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76872/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77499/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118799879
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
    +
    +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  private lazy val cacheExpireTimeout: Long =
    +    SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m")
    +
    +  private val removalListener = new RemovalListener[Seq[(String, Object)], Producer]() {
    +    override def onRemoval(
    +        notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = {
    +      val paramsSeq: Seq[(String, Object)] = notification.getKey
    +      val producer: Producer = notification.getValue
    +      logDebug(
    +        s"Evicting kafka producer $producer params: $paramsSeq, due to ${notification.getCause}")
    +      close(paramsSeq, producer)
    +    }
    +  }
    +
    +  private lazy val guavaCache: Cache[Seq[(String, Object)], Producer] = CacheBuilder.newBuilder()
    +    .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
    +    .removalListener(removalListener)
    +    .build[Seq[(String, Object)], Producer]()
    --- End diff --
    
    nit: Use `build(CacheLoader<? super K1, V1> loader)` to use LoadingCache, then `getOrCreate` will be very simple.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118186083
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -36,7 +36,7 @@ private[kafka010] object CachedKafkaProducer extends Logging {
       private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
     
       private val cacheExpireTimeout: Long =
    -    System.getProperty("spark.kafka.guava.cache.timeout", "10").toLong
    +    System.getProperty("spark.kafka.guava.cache.timeout.minutes", "10").toLong
    --- End diff --
    
    don't we need to get this from `SparkEnv` by the way? I don't know if the properties get populated properly.
    Also, adding `minutes` to the conf makes it kinda long right? I think we can also replace `guava` with `producer`. 
    I think it may also be better to use [this](https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java#L213) so that we get rid of `minutes` and users can actually provide arbitrary durations (hours if they want). I think that's what we generally use for `duration` type confs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118718340
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
    +
    +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  private lazy val cacheExpireTimeout: Long =
    +    SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m")
    +
    +  private val removalListener = new RemovalListener[Seq[(String, Object)], Producer]() {
    +    override def onRemoval(
    +        notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = {
    +      val paramsSeq: Seq[(String, Object)] = notification.getKey
    +      val producer: Producer = notification.getValue
    +      logDebug(
    +        s"Evicting kafka producer $producer params: $paramsSeq, due to ${notification.getCause}")
    +      close(paramsSeq, producer)
    +    }
    +  }
    +
    +  private lazy val guavaCache: Cache[Seq[(String, Object)], Producer] = CacheBuilder.newBuilder()
    +    .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
    +    .removalListener(removalListener)
    +    .build[Seq[(String, Object)], Producer]()
    +
    +  private def createKafkaProducer(producerConfiguration: ju.Map[String, Object]): Producer = {
    +    val kafkaProducer: Producer = new Producer(producerConfiguration)
    +    val paramsSeq: Seq[(String, Object)] = paramsToSeq(producerConfiguration)
    +    guavaCache.put(paramsSeq, kafkaProducer)
    +    logDebug(s"Created a new instance of KafkaProducer for $producerConfiguration.")
    +    kafkaProducer
    +  }
    +
    +  /**
    +   * Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't
    +   * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep
    +   * one instance per specified kafkaParams.
    +   */
    +  private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = synchronized {
    +    val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParams)
    +    Option(guavaCache.getIfPresent(paramsSeq)).getOrElse(createKafkaProducer(kafkaParams))
    +  }
    +
    +  def paramsToSeq(kafkaParams: ju.Map[String, Object]): Seq[(String, Object)] = {
    --- End diff --
    
    nit: seems can be `private`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74644/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77499 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77499/testReport)** for PR 17308 at commit [`a10276a`](https://github.com/apache/spark/commit/a10276abc783ad3818821c31f38ce56acc3e9d77).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76890/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77021 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77021/testReport)** for PR 17308 at commit [`3ec9981`](https://github.com/apache/spark/commit/3ec9981d4148b58df1f0e7c029762d5a6d3dd446).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118200287
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -36,7 +36,7 @@ private[kafka010] object CachedKafkaProducer extends Logging {
       private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
     
       private val cacheExpireTimeout: Long =
    -    System.getProperty("spark.kafka.guava.cache.timeout", "10").toLong
    +    System.getProperty("spark.kafka.guava.cache.timeout.minutes", "10").toLong
    --- End diff --
    
    Thanks, you are right !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117605102
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala ---
    @@ -22,6 +22,7 @@ import java.{util => ju}
     import org.apache.spark.internal.Logging
     import org.apache.spark.sql.{DataFrame, SQLContext}
     import org.apache.spark.sql.execution.streaming.Sink
    +import org.apache.spark.util.Utils
    --- End diff --
    
    Do we need to import this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118720547
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
    +
    +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  private lazy val cacheExpireTimeout: Long =
    +    SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m")
    +
    +  private val removalListener = new RemovalListener[Seq[(String, Object)], Producer]() {
    +    override def onRemoval(
    +        notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = {
    +      val paramsSeq: Seq[(String, Object)] = notification.getKey
    +      val producer: Producer = notification.getValue
    +      logDebug(
    +        s"Evicting kafka producer $producer params: $paramsSeq, due to ${notification.getCause}")
    +      close(paramsSeq, producer)
    +    }
    +  }
    +
    +  private lazy val guavaCache: Cache[Seq[(String, Object)], Producer] = CacheBuilder.newBuilder()
    +    .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
    +    .removalListener(removalListener)
    +    .build[Seq[(String, Object)], Producer]()
    +
    +  private def createKafkaProducer(producerConfiguration: ju.Map[String, Object]): Producer = {
    +    val kafkaProducer: Producer = new Producer(producerConfiguration)
    +    val paramsSeq: Seq[(String, Object)] = paramsToSeq(producerConfiguration)
    +    guavaCache.put(paramsSeq, kafkaProducer)
    +    logDebug(s"Created a new instance of KafkaProducer for $producerConfiguration.")
    +    kafkaProducer
    +  }
    +
    +  /**
    +   * Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't
    +   * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep
    +   * one instance per specified kafkaParams.
    +   */
    +  private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = synchronized {
    +    val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParams)
    +    Option(guavaCache.getIfPresent(paramsSeq)).getOrElse(createKafkaProducer(kafkaParams))
    +  }
    +
    +  def paramsToSeq(kafkaParams: ju.Map[String, Object]): Seq[(String, Object)] = {
    +    val paramsSeq: Seq[(String, Object)] =
    +      kafkaParams.asScala.toSeq.sortBy(x => (x._1, x._2.toString))
    --- End diff --
    
    As it is a map, seems we can just sort by x._1?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    LGTM! @marmbrus @viirya do you have any more feedback?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117804651
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -18,34 +18,58 @@
     package org.apache.spark.sql.kafka010
     
     import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
     import javax.annotation.concurrent.GuardedBy
     
    +import com.google.common.cache._
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
     import scala.collection.immutable.SortedMap
     import scala.collection.mutable
    -
    -import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.util.control.NonFatal
     
     import org.apache.spark.internal.Logging
     
     private[kafka010] object CachedKafkaProducer extends Logging {
     
       private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
     
    -  @GuardedBy("this")
    -  private val cacheMap = new mutable.HashMap[String, Producer]()
    +  private val cacheExpireTimeout: Long = System.getProperty("spark.kafka.guava.cache.timeout",
    +    "10").toLong
    +
    +  private val noneReturningLoader = new CacheLoader[String, Option[Producer]] {
    --- End diff --
    
    The builder shouldn't **require** a loader.
    https://google.github.io/guava/releases/16.0/api/docs/com/google/common/cache/CacheBuilder.html#build()
    Not sure we use the same guava version though


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118719834
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.util.concurrent.ConcurrentMap
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import org.apache.kafka.common.serialization.ByteArraySerializer
    +import org.scalatest.PrivateMethodTester
    +
    +import org.apache.spark.sql.test.SharedSQLContext
    +
    +class CachedKafkaProducerSuite extends SharedSQLContext with PrivateMethodTester {
    +
    +  type KP = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  protected override def beforeEach(): Unit = {
    +    super.beforeEach()
    +    val clear = PrivateMethod[Unit]('clear)
    +    CachedKafkaProducer.invokePrivate(clear())
    +  }
    +
    +  test("Should return the cached instance on calling getOrCreate with same params.") {
    +    val kafkaParams = new ju.HashMap[String, Object]()
    +    kafkaParams.put("acks", "0")
    +    // Here only host should be resolvable, it does not need a running instance of kafka server.
    +    kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
    +    kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
    +    kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
    +    val producer = CachedKafkaProducer.getOrCreate(kafkaParams)
    +    val producer2 = CachedKafkaProducer.getOrCreate(kafkaParams)
    +    assert(producer == producer2)
    +
    +    val cacheMap = PrivateMethod[ConcurrentMap[String, Option[KP]]]('getAsMap)
    +    val map = CachedKafkaProducer.invokePrivate(cacheMap())
    +    assert(map.size == 1)
    +  }
    +
    +  test("Should close the correct kafka producer for the given kafkaPrams.") {
    +    val kafkaParams = new ju.HashMap[String, Object]()
    +    kafkaParams.put("acks", "0")
    +    kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
    +    kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
    +    kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
    +    val producer: KP = CachedKafkaProducer.getOrCreate(kafkaParams)
    +    kafkaParams.put("acks", "1")
    +    val producer2: KP = CachedKafkaProducer.getOrCreate(kafkaParams)
    +    // With updated conf, a new producer instance should be created.
    +    assert(producer != producer2)
    +
    +    val cacheMap = PrivateMethod[ConcurrentMap[String, Option[KP]]]('getAsMap)
    +    val map = CachedKafkaProducer.invokePrivate(cacheMap())
    +    assert(map.size == 2)
    +
    +    CachedKafkaProducer.close(kafkaParams)
    +    val map2 = CachedKafkaProducer.invokePrivate(cacheMap())
    +    assert(map2.size == 1)
    --- End diff --
    
    We just know there is one KP by this assert. Seems we should also verify if we close the correct KP?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118799350
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
    +
    +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  private lazy val cacheExpireTimeout: Long =
    +    SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m")
    +
    +  private val removalListener = new RemovalListener[Seq[(String, Object)], Producer]() {
    +    override def onRemoval(
    +        notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = {
    +      val paramsSeq: Seq[(String, Object)] = notification.getKey
    +      val producer: Producer = notification.getValue
    +      logDebug(
    +        s"Evicting kafka producer $producer params: $paramsSeq, due to ${notification.getCause}")
    +      close(paramsSeq, producer)
    +    }
    +  }
    +
    +  private lazy val guavaCache: Cache[Seq[(String, Object)], Producer] = CacheBuilder.newBuilder()
    +    .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
    +    .removalListener(removalListener)
    +    .build[Seq[(String, Object)], Producer]()
    +
    +  private def createKafkaProducer(producerConfiguration: ju.Map[String, Object]): Producer = {
    +    val kafkaProducer: Producer = new Producer(producerConfiguration)
    +    val paramsSeq: Seq[(String, Object)] = paramsToSeq(producerConfiguration)
    +    guavaCache.put(paramsSeq, kafkaProducer)
    +    logDebug(s"Created a new instance of KafkaProducer for $producerConfiguration.")
    +    kafkaProducer
    +  }
    +
    +  /**
    +   * Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't
    +   * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep
    +   * one instance per specified kafkaParams.
    +   */
    +  private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = synchronized {
    +    val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParams)
    +    Option(guavaCache.getIfPresent(paramsSeq)).getOrElse(createKafkaProducer(kafkaParams))
    --- End diff --
    
    Remove `synchronized` and also throw inner exception instead after changing to use `LoadingCache`, such as
    ```
      private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = {
        val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParams)
        try {
          guavaCache.get(paramsSeq)
        } catch {
          case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError)
            if e.getCause != null =>
              throw e.getCause
        }
      }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117078211
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala ---
    @@ -94,4 +94,10 @@ private[kafka010] object KafkaWriter extends Logging {
           }
         }
       }
    +
    +  def close(sc: SparkContext, kafkaParams: ju.Map[String, Object]): Unit = {
    +    sc.parallelize(1 to 10000).foreachPartition { iter =>
    +      CachedKafkaProducer.close(kafkaParams)
    +    }
    --- End diff --
    
    AFAIK the KafkaSource also faces the same issue of not being able to close consumers. Can we use a guava cache with a (configurable) timeout? I guess that's the safest way to make sure that they'll eventually get closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    I can further confirm, that in logs,  a kafkaproducer instance is created almost every instant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    @marmbrus and @brkyvz Please take another look, and let me know how it can be improved further. Thank you for the help so far.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117605187
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CanonicalizeKafkaParamsSuite.scala ---
    @@ -0,0 +1,61 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +
    +import org.apache.kafka.common.serialization.ByteArraySerializer
    +import org.scalatest.PrivateMethodTester
    +
    +import org.apache.spark.sql.test.SharedSQLContext
    +
    +class CanonicalizeKafkaParamsSuite extends SharedSQLContext with PrivateMethodTester {
    +
    +  test("Same unique id is returned for same set of kafka parameters") {
    +    CanonicalizeKafkaParams.clear()
    +    val kafkaParams = new ju.HashMap[String, Object]()
    +    kafkaParams.put("acks", "0")
    +    // Here only host should be resolvable, it does not need a running instance of kafka server.
    +    kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
    +    kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
    +    kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
    +    val kp2 = CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    +    val kafkaParams2 = new ju.HashMap[String, Object](kafkaParams)
    +    val kp3 = CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams2)
    +    val uid1 = kp2.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId).toString
    +    val uid2 = kp3.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId).toString
    +    assert(uid1 == uid2)
    +  }
    +
    +  test("New unique id is generated for any modification in kafka parameters.") {
    +    CanonicalizeKafkaParams.clear()
    +    val kafkaParams = new ju.HashMap[String, Object]()
    +    kafkaParams.put("acks", "0")
    +    // Here only host should be resolvable, it does not need a running instance of kafka server.
    +    kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
    +    kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
    +    kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
    +    val kp2 = CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    +    val kafkaParams2 = new ju.HashMap[String, Object](kafkaParams)
    +    kafkaParams2.put("acks", "1")
    +    val kp3 = CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams2)
    --- End diff --
    
    CanonicalKP2?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77357 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77357/testReport)** for PR 17308 at commit [`588fa03`](https://github.com/apache/spark/commit/588fa03d299328405121534b114716cd00bc3a48).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77297 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77297/testReport)** for PR 17308 at commit [`1c9f892`](https://github.com/apache/spark/commit/1c9f89226ef37d338032fbf8b4b8a1c527cc5efd).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77293/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117077446
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.immutable.SortedMap
    +import scala.collection.mutable
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  @GuardedBy("this")
    +  private val cacheMap = new mutable.HashMap[String, Producer]()
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.Map[String, Object]): Producer = {
    +    val uid = producerConfiguration.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId)
    +    val kafkaProducer: Producer = new Producer(producerConfiguration)
    +    cacheMap.put(uid.toString, kafkaProducer)
    +    log.debug(s"Created a new instance of KafkaProducer for $producerConfiguration.")
    +    kafkaProducer
    +  }
    +
    +  private def getUniqueId(kafkaParams: ju.Map[String, Object]): String = {
    +    val uid = kafkaParams.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId)
    +    assert(uid != null, s"KafkaParams($kafkaParams) not canonicalized")
    +    uid.toString
    +  }
    +
    +  /**
    +   * Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't
    +   * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep
    +   * one instance per specified kafkaParams.
    +   */
    +  private[kafka010] def getOrCreate(
    +    kafkaParams: ju.Map[String, Object]): Producer = synchronized {
    +    val params = if (!CanonicalizeKafkaParams.isCanonicalized(kafkaParams)) {
    +      CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    +    } else kafkaParams
    +    val uid = getUniqueId(params)
    +    cacheMap.getOrElse(uid.toString, createKafkaProducer(params))
    +  }
    +
    +  private[kafka010] def close(kafkaParams: ju.Map[String, Object]): Unit = synchronized {
    +    val params = if (!CanonicalizeKafkaParams.isCanonicalized(kafkaParams)) {
    +      CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    +    } else kafkaParams
    +    val uid = getUniqueId(params)
    +
    +    val producer: Option[Producer] = cacheMap.remove(uid)
    +    if (producer.isDefined) {
    +      log.info(s"Closing the KafkaProducer with config: $kafkaParams")
    +      CanonicalizeKafkaParams.remove(kafkaParams)
    +      producer.foreach(_.close())
    +    } else {
    +      log.warn(s"No KafkaProducer found in cache for $kafkaParams.")
    +    }
    +  }
    +
    +  // Intended for testing purpose only.
    +  private def clear(): Unit = {
    +    cacheMap.foreach(x => x._2.close())
    +    cacheMap.clear()
    +  }
    +}
    +
    +/**
    + * Canonicalize kafka params i.e. append a unique internal id to kafka params, if it already does
    + * not exist. This is done to ensure, we have only one set of kafka parameters associated with a
    + * unique ID.
    + */
    +private[kafka010] object CanonicalizeKafkaParams extends Logging {
    +
    +  import scala.collection.JavaConverters._
    +
    +  @GuardedBy("this")
    +  private val registryMap = mutable.HashMap[String, String]()
    +
    +  private[kafka010] val sparkKafkaParamsUniqueId: String =
    +    "spark.internal.sql.kafka.params.uniqueId"
    +
    +  private def generateRandomUUID(kafkaParams: String): String = {
    +    val uuid = ju.UUID.randomUUID().toString
    +    logDebug(s"Generating a new unique id:$uuid for kafka params: $kafkaParams")
    +    registryMap.put(kafkaParams, uuid)
    +    uuid
    +  }
    +
    +  private[kafka010] def isCanonicalized(kafkaParams: ju.Map[String, Object]): Boolean = {
    +    if (kafkaParams.get(sparkKafkaParamsUniqueId) != null) {
    +      true
    +    } else {
    +      false
    +    }
    +  }
    +
    +  private[kafka010] def computeUniqueCanonicalForm(
    +    kafkaParams: ju.Map[String, Object]): ju.Map[String, Object] = synchronized {
    +    if (isCanonicalized(kafkaParams)) {
    +      logWarning(s"A unique id,$sparkKafkaParamsUniqueId ->" +
    +        s" ${kafkaParams.get(sparkKafkaParamsUniqueId)}" +
    +        s" already exists in kafka params, returning Kafka Params:$kafkaParams as is.")
    --- End diff --
    
    space after `:`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77233/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117076015
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.immutable.SortedMap
    +import scala.collection.mutable
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  @GuardedBy("this")
    +  private val cacheMap = new mutable.HashMap[String, Producer]()
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.Map[String, Object]): Producer = {
    --- End diff --
    
    nit: indent 2 more spaces
    ```scala
    private def createKafkaProducer(
        producerConfiguration: ju.Map[String, Object]): Producer = {
      ...
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77021/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #74644 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74644/testReport)** for PR 17308 at commit [`febf387`](https://github.com/apache/spark/commit/febf3874cf07bad04e574b571f1caa839c9c28b7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77410 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77410/testReport)** for PR 17308 at commit [`588fa03`](https://github.com/apache/spark/commit/588fa03d299328405121534b114716cd00bc3a48).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Thanks @viirya and @zsxwing. I have tried to address you comments. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #76872 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76872/testReport)** for PR 17308 at commit [`ea9592a`](https://github.com/apache/spark/commit/ea9592adacfa7b923d1df93d3b40f0ec6a3bc548).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #76869 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76869/testReport)** for PR 17308 at commit [`2a15afe`](https://github.com/apache/spark/commit/2a15afe326ba7660ff991811fa200c796113b146).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117804270
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -18,34 +18,58 @@
     package org.apache.spark.sql.kafka010
     
     import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
     import javax.annotation.concurrent.GuardedBy
     
    +import com.google.common.cache._
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
     import scala.collection.immutable.SortedMap
     import scala.collection.mutable
    -
    -import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.util.control.NonFatal
     
     import org.apache.spark.internal.Logging
     
     private[kafka010] object CachedKafkaProducer extends Logging {
     
       private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
     
    -  @GuardedBy("this")
    -  private val cacheMap = new mutable.HashMap[String, Producer]()
    +  private val cacheExpireTimeout: Long = System.getProperty("spark.kafka.guava.cache.timeout",
    +    "10").toLong
    +
    +  private val noneReturningLoader = new CacheLoader[String, Option[Producer]] {
    +    override def load(key: String): Option[Producer] = {
    +      None
    +    }
    +  }
    +
    +  private val removalListener = new RemovalListener[String, Option[Producer]]() {
    +    override def onRemoval(notification: RemovalNotification[String, Option[Producer]]): Unit = {
    +      val uid: String = notification.getKey
    +      val producer = notification.getValue
    +      log.debug(s"Evicting kafka producer $producer uid:$uid, due to ${notification.getCause}")
    --- End diff --
    
    space after `:`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77293 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77293/testReport)** for PR 17308 at commit [`039d063`](https://github.com/apache/spark/commit/039d063af502586109afb0ecd135390c4b7d2050).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    SPARK-20737 is created to look into cleanup mechanism in a separate JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117803947
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -18,34 +18,58 @@
     package org.apache.spark.sql.kafka010
     
     import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
     import javax.annotation.concurrent.GuardedBy
     
    +import com.google.common.cache._
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
     import scala.collection.immutable.SortedMap
     import scala.collection.mutable
    -
    -import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.util.control.NonFatal
     
     import org.apache.spark.internal.Logging
     
     private[kafka010] object CachedKafkaProducer extends Logging {
     
       private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
     
    -  @GuardedBy("this")
    -  private val cacheMap = new mutable.HashMap[String, Producer]()
    +  private val cacheExpireTimeout: Long = System.getProperty("spark.kafka.guava.cache.timeout",
    --- End diff --
    
    I wonder if this should be an option of the sink? I may be running multiple streams in a single cluster with different trigger intervals which would require different configs. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r114842616
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ---
    @@ -88,7 +85,7 @@ private[kafka010] class KafkaWriteTask(
           case t =>
             throw new IllegalStateException(s"${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
               s"attribute unsupported type $t. ${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
    -          s"must be a ${StringType}")
    +          s"must be a $StringType")
    --- End diff --
    
    don't need the `s` and `$` right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SPARK-20737][SS] Use a cached instance of ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #77225 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77225/testReport)** for PR 17308 at commit [`ef2d6cd`](https://github.com/apache/spark/commit/ef2d6cd4275d93518ec27d4b08916575a3e597d7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #76868 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76868/testReport)** for PR 17308 at commit [`932d563`](https://github.com/apache/spark/commit/932d5638feed4fd5c602da27129b5bc4b81ff897).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76932/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Please take a look, @tcondie @zsxwing !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by BenFradet <gi...@git.apache.org>.
Github user BenFradet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r110685760
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,70 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +
    +import scala.collection.mutable
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private val cacheMap = new mutable.HashMap[Int, KafkaProducer[Array[Byte], Array[Byte]]]()
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.HashMap[String, Object]): KafkaProducer[Array[Byte], Array[Byte]] = {
    +    val kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]] =
    +      new KafkaProducer[Array[Byte], Array[Byte]](producerConfiguration)
    +    cacheMap.put(producerConfiguration.hashCode(), kafkaProducer)
    --- End diff --
    
    True, my bad I thought `KafkaSink` was a public API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #76932 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76932/testReport)** for PR 17308 at commit [`e07e77e`](https://github.com/apache/spark/commit/e07e77e78c568a7cb5e68eb25e71e330a9f61962).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    LGTM and few minor comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SS] Use a cached instance of `Kafka...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r114842207
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,70 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +
    +import scala.collection.mutable
    +
    +import org.apache.kafka.clients.producer.KafkaProducer
    +
    +import org.apache.spark.internal.Logging
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private val cacheMap = new mutable.HashMap[Int, KafkaProducer[Array[Byte], Array[Byte]]]()
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.HashMap[String, Object]): KafkaProducer[Array[Byte], Array[Byte]] = {
    +    val kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]] =
    +      new KafkaProducer[Array[Byte], Array[Byte]](producerConfiguration)
    +    cacheMap.put(producerConfiguration.hashCode(), kafkaProducer)
    +    kafkaProducer
    +  }
    +
    +  /**
    +   * Get a cached KafkaProducer for a given configuration. A new KafkaProducer will be created,
    +   * if matching KafkaProducer doesn't exist. KafkaProducer is thread safe, it is best to keep
    +   * one instance per specified kafkaParams.
    +   */
    +  def getOrCreate(kafkaParams: ju.HashMap[String, Object])
    +  : KafkaProducer[Array[Byte], Array[Byte]] = synchronized {
    +    // KafkaParams is a java.util.HashMap as it produces a new hashcode for
    +    // for every change in the contents of the HashMap and ensures m1 == m2 implies
    +    // m1.hashCode == m2.hashCode.
    +    log.debug(s"Created a new instance of KafkaProducer for $kafkaParams.")
    --- End diff --
    
    maybe put this in the `create` function instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117717831
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala ---
    @@ -94,4 +94,10 @@ private[kafka010] object KafkaWriter extends Logging {
           }
         }
       }
    +
    +  def close(sc: SparkContext, kafkaParams: ju.Map[String, Object]): Unit = {
    +    sc.parallelize(1 to 10000).foreachPartition { iter =>
    +      CachedKafkaProducer.close(kafkaParams)
    +    }
    --- End diff --
    
    Using guave cache, we can close if not used for a certain time. Shall we ignore closing them during a shutdown ? 
    In the particular case of kafka producer, I do not see a direct problem with that. Since we do a producer.flush() on each batch. I was just wondering, with streaming sinks general - what should be our strategy ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    **[Test build #76932 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76932/testReport)** for PR 17308 at commit [`e07e77e`](https://github.com/apache/spark/commit/e07e77e78c568a7cb5e68eb25e71e330a9f61962).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17308: [SPARK-19968][SPARK-20737][SS] Use a cached insta...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r118389504
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.SortedMap
    +import scala.collection.mutable
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.util.ShutdownHookManager
    +
    +private[kafka010] object CachedKafkaProducer extends Logging {
    +
    +  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
    +
    +  private lazy val cacheExpireTimeout: Long =
    +    SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m")
    +
    +  private val removalListener = new RemovalListener[String, Producer]() {
    +    override def onRemoval(notification: RemovalNotification[String, Producer]): Unit = {
    +      val uid: String = notification.getKey
    +      val producer: Producer = notification.getValue
    +      logDebug(s"Evicting kafka producer $producer uid: $uid, due to ${notification.getCause}")
    +      close(uid, producer)
    +    }
    +  }
    +
    +  private lazy val guavaCache: Cache[String, Producer] = CacheBuilder.newBuilder()
    +    .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
    +    .removalListener(removalListener)
    +    .build[String, Producer]()
    +
    +  ShutdownHookManager.addShutdownHook { () =>
    +    clear()
    +  }
    +
    +  private def createKafkaProducer(
    +    producerConfiguration: ju.Map[String, Object]): Producer = {
    +    val uid = getUniqueId(producerConfiguration)
    +    val kafkaProducer: Producer = new Producer(producerConfiguration)
    +    guavaCache.put(uid.toString, kafkaProducer)
    +    logDebug(s"Created a new instance of KafkaProducer for $producerConfiguration.")
    +    kafkaProducer
    +  }
    +
    +  private def getUniqueId(kafkaParams: ju.Map[String, Object]): String = {
    +    val uid = kafkaParams.get(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId)
    +    assert(uid != null, s"KafkaParams($kafkaParams) not canonicalized.")
    +    uid.toString
    +  }
    +
    +  /**
    +   * Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't
    +   * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep
    +   * one instance per specified kafkaParams.
    +   */
    +  private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = synchronized {
    +    val params = if (!CanonicalizeKafkaParams.isCanonicalized(kafkaParams)) {
    +      CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    +    } else {
    +      kafkaParams
    +    }
    +    val uid = getUniqueId(params)
    +    Option(guavaCache.getIfPresent(uid)).getOrElse(createKafkaProducer(params))
    +  }
    +
    +  /** For explicitly closing kafka producer */
    +  private[kafka010] def close(kafkaParams: ju.Map[String, Object]): Unit = {
    +    val params = if (!CanonicalizeKafkaParams.isCanonicalized(kafkaParams)) {
    +      CanonicalizeKafkaParams.computeUniqueCanonicalForm(kafkaParams)
    +    } else kafkaParams
    +    val uid = getUniqueId(params)
    +    guavaCache.invalidate(uid)
    +  }
    +
    +  /** Auto close on cache evict */
    +  private def close(uid: String, producer: Producer): Unit = {
    +    try {
    +      val outcome = CanonicalizeKafkaParams.remove(
    +        new ju.HashMap[String, Object](
    +          Map(CanonicalizeKafkaParams.sparkKafkaParamsUniqueId -> uid).asJava))
    +      logDebug(s"Removed kafka params from cache: $outcome.")
    +      logInfo(s"Closing the KafkaProducer with uid: $uid.")
    +      producer.close()
    +    } catch {
    +      case NonFatal(e) => logWarning("Error while closing kafka producer.", e)
    +    }
    +  }
    +
    +  private def clear(): Unit = {
    +    logInfo("Cleaning up guava cache.")
    +    guavaCache.invalidateAll()
    +  }
    +
    +  // Intended for testing purpose only.
    +  private def getAsMap: ConcurrentMap[String, Producer] = guavaCache.asMap()
    +}
    +
    +/**
    + * Canonicalize kafka params i.e. append a unique internal id to kafka params, if it already does
    + * not exist. This is done to ensure, we have only one set of kafka parameters associated with a
    + * unique ID.
    + */
    +private[kafka010] object CanonicalizeKafkaParams extends Logging {
    --- End diff --
    
    This seems kind of complicated also.  Since we know these are always coming from `Data[Stream/Frame]Writer` and that will always give you `Map[String, String]` (and we expect the number of options here to be small).  Could we just make the key for the cache a sorted `Seq[(String, String)]` rather than invent another GUID?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76868/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17308: [SPARK-19968][SS] Use a cached instance of `KafkaProduce...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17308
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77358/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org