You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/01/13 07:20:05 UTC

[spark] branch master updated: [SPARK-21869][SS][DOCS][FOLLOWUP] Document Kafka producer pool configuration

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new eefcc7d  [SPARK-21869][SS][DOCS][FOLLOWUP] Document Kafka producer pool configuration
eefcc7d is described below

commit eefcc7d762a627bf19cab7041a1a82f88862e7e1
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Sun Jan 12 23:19:37 2020 -0800

    [SPARK-21869][SS][DOCS][FOLLOWUP] Document Kafka producer pool configuration
    
    ### What changes were proposed in this pull request?
    
    This patch documents the configuration for the Kafka producer pool, newly revised via SPARK-21869 (#26845)
    
    ### Why are the changes needed?
    
    The explanation of new Kafka producer pool configuration is missing, whereas the doc has Kafka
     consumer pool configuration.
    
    ### Does this PR introduce any user-facing change?
    
    Yes. This is a documentation change.
    
    ![Screen Shot 2020-01-12 at 11 16 19 PM](https://user-images.githubusercontent.com/9700541/72238148-c8959e00-3591-11ea-87fc-a8918792017e.png)
    
    ### How was this patch tested?
    
    N/A
    
    Closes #27146 from HeartSaVioR/SPARK-21869-FOLLOWUP.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 docs/structured-streaming-kafka-integration.md | 32 ++++++++++++++++++++++++--
 1 file changed, 30 insertions(+), 2 deletions(-)

diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index e939e83..0820b38 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -539,7 +539,7 @@ The following properties are available to configure the consumer pool:
 <tr>
   <td>spark.kafka.consumer.cache.evictorThreadRunInterval</td>
   <td>The interval of time between runs of the idle evictor thread for consumer pool. When non-positive, no idle evictor thread will be run.</td>
-  <td>1m (1 minutes)</td>
+  <td>1m (1 minute)</td>
 </tr>
 <tr>
   <td>spark.kafka.consumer.cache.jmx.enable</td>
@@ -580,7 +580,7 @@ The following properties are available to configure the fetched data pool:
 <tr>
   <td>spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval</td>
   <td>The interval of time between runs of the idle evictor thread for fetched data pool. When non-positive, no idle evictor thread will be run.</td>
-  <td>1m (1 minutes)</td>
+  <td>1m (1 minute)</td>
 </tr>
 </table>
 
@@ -802,6 +802,34 @@ df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
 </div>
 </div>
 
+### Producer Caching
+
+Given Kafka producer instance is designed to be thread-safe, Spark initializes a Kafka producer instance and co-use across tasks for same caching key.
+
+The caching key is built up from the following information:
+
+* Kafka producer configuration
+
+This includes configuration for authorization, which Spark will automatically include when delegation token is being used. Even we take authorization into account, you can expect same Kafka producer instance will be used among same Kafka producer configuration.
+It will use different Kafka producer when delegation token is renewed; Kafka producer instance for old delegation token will be evicted according to the cache policy.
+
+The following properties are available to configure the producer pool:
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr>
+  <td>spark.kafka.producer.cache.timeout</td>
+  <td>The minimum amount of time a producer may sit idle in the pool before it is eligible for eviction by the evictor.</td>
+  <td>10m (10 minutes)</td>
+</tr>
+<tr>
+  <td>spark.kafka.producer.cache.evictorThreadRunInterval</td>
+  <td>The interval of time between runs of the idle evictor thread for producer pool. When non-positive, no idle evictor thread will be run.</td>
+  <td>1m (1 minute)</td>
+</tr>
+</table>
+
+Idle eviction thread periodically removes producers which are not used longer than given timeout. Note that the producer is shared and used concurrently, so the last used timestamp is determined by the moment the producer instance is returned and reference count is 0.
 
 ## Kafka Specific Configurations
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org