You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/05/15 17:42:32 UTC

[spark] branch master updated: [SPARK-27687][SS] Rename Kafka consumer cache capacity conf and document caching

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new efa3035  [SPARK-27687][SS] Rename Kafka consumer cache capacity conf and document caching
efa3035 is described below

commit efa303581ac61d6f517aacd08883da2d01530bd2
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Wed May 15 10:42:09 2019 -0700

    [SPARK-27687][SS] Rename Kafka consumer cache capacity conf and document caching
    
    ## What changes were proposed in this pull request?
    
    Kafka related Spark parameters has to start with `spark.kafka.` and not with `spark.sql.`. Because of this I've renamed `spark.sql.kafkaConsumerCache.capacity`.
    
    Since Kafka consumer caching is not documented I've added this also.
    
    ## How was this patch tested?
    
    Existing + added unit test.
    
    ```
    cd docs
    SKIP_API=1 jekyll build
    ```
    and manual webpage check.
    
    Closes #24590 from gaborgsomogyi/SPARK-27687.
    
    Authored-by: Gabor Somogyi <ga...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../main/scala/org/apache/spark/SparkConf.scala    |  4 +++-
 docs/structured-streaming-kafka-integration.md     | 18 ++++++++++++++
 .../org/apache/spark/sql/kafka010/package.scala    |  5 ++--
 .../spark/sql/kafka010/KafkaSparkConfSuite.scala}  | 28 +++++++---------------
 4 files changed, 33 insertions(+), 22 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 240707e..aa93f42 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -714,7 +714,9 @@ private[spark] object SparkConf extends Logging {
       AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0")),
     KERBEROS_FILESYSTEMS_TO_ACCESS.key -> Seq(
       AlternateConfig("spark.yarn.access.namenodes", "2.2"),
-      AlternateConfig("spark.yarn.access.hadoopFileSystems", "3.0"))
+      AlternateConfig("spark.yarn.access.hadoopFileSystems", "3.0")),
+    "spark.kafka.consumer.cache.capacity" -> Seq(
+      AlternateConfig("spark.sql.kafkaConsumerCache.capacity", "3.0"))
   )
 
   /**
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index 4a295e0..bbff822 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -416,6 +416,24 @@ The following configurations are optional:
 </tr>
 </table>
 
+### Consumer Caching
+
+It's time-consuming to initialize Kafka consumers, especially in streaming scenarios where processing time is a key factor.
+Because of this, Spark caches Kafka consumers on executors. The caching key is built up from the following information:
+* Topic name
+* Topic partition
+* Group ID
+
+The size of the cache is limited by <code>spark.kafka.consumer.cache.capacity</code> (default: 64).
+If this threshold is reached, it tries to remove the least-used entry that is currently not in use.
+If it cannot be removed, then the cache will keep growing. In the worst case, the cache will grow to
+the max number of concurrent tasks that can run in the executor (that is, number of tasks slots),
+after which it will never reduce.
+
+If a task fails for any reason the new task is executed with a newly created Kafka consumer for safety reasons.
+At the same time the cached Kafka consumer which was used in the failed execution will be invalidated. Here it has to
+be emphasized it will not be closed if any other task is using it.
+
 ## Writing Data to Kafka
 
 Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
index 115ec44..ff19862 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
@@ -33,8 +33,9 @@ package object kafka010 {   // scalastyle:ignore
       .createWithDefaultString("10m")
 
   private[kafka010] val CONSUMER_CACHE_CAPACITY =
-    ConfigBuilder("spark.sql.kafkaConsumerCache.capacity")
-      .doc("The size of consumers cached.")
+    ConfigBuilder("spark.kafka.consumer.cache.capacity")
+      .doc("The maximum number of consumers cached. Please note it's a soft limit" +
+        " (check Structured Streaming Kafka integration guide for further details).")
       .intConf
       .createWithDefault(64)
 }
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSparkConfSuite.scala
similarity index 50%
copy from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
copy to external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSparkConfSuite.scala
index 115ec44..ca8b8b6 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSparkConfSuite.scala
@@ -14,27 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql
 
-import java.util.concurrent.TimeUnit
+package org.apache.spark.sql.kafka010
 
-import org.apache.kafka.common.TopicPartition
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.util.ResetSystemProperties
 
-import org.apache.spark.internal.config.ConfigBuilder
+class KafkaSparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties {
+  test("deprecated configs") {
+    val conf = new SparkConf()
 
-package object kafka010 {   // scalastyle:ignore
-  // ^^ scalastyle:ignore is for ignoring warnings about digits in package name
-  type PartitionOffsetMap = Map[TopicPartition, Long]
-
-  private[kafka010] val PRODUCER_CACHE_TIMEOUT =
-    ConfigBuilder("spark.kafka.producer.cache.timeout")
-      .doc("The expire time to remove the unused producers.")
-      .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("10m")
-
-  private[kafka010] val CONSUMER_CACHE_CAPACITY =
-    ConfigBuilder("spark.sql.kafkaConsumerCache.capacity")
-      .doc("The size of consumers cached.")
-      .intConf
-      .createWithDefault(64)
+    conf.set("spark.sql.kafkaConsumerCache.capacity", "32")
+    assert(conf.get(CONSUMER_CACHE_CAPACITY) === 32)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org