You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/03/26 11:12:11 UTC

[spark] branch master updated: [SPARK-31228][DSTREAMS] Add version information to the configuration of Kafka

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 35d286b  [SPARK-31228][DSTREAMS] Add version information to the configuration of Kafka
35d286b is described below

commit 35d286bafb248a47a1125908c0208cd759dd0416
Author: beliefer <be...@163.com>
AuthorDate: Thu Mar 26 20:11:15 2020 +0900

    [SPARK-31228][DSTREAMS] Add version information to the configuration of Kafka
    
    ### What changes were proposed in this pull request?
    Add version information to the configuration of Kafka.
    
    I sorted out some information show below.
    
    Item name | Since version | JIRA ID | Commit ID | Note
    -- | -- | -- | -- | --
    spark.streaming.kafka.consumer.cache.enabled | 2.2.1 | SPARK-19185 | 02cf178bb2a7dc8b4c06eb040c44b6453e41ed15#diff-c465bbcc83b2ecc7530d1c0128e4432b |  
    spark.streaming.kafka.consumer.poll.ms | 2.0.1 | SPARK-12177 | 3134f116a3565c3a299fa2e7094acd7304d64280#diff-4597d93a0e951f7199697dba7dd0dc32 |  
    spark.streaming.kafka.consumer.cache.initialCapacity | 2.0.1 | SPARK-12177 | 3134f116a3565c3a299fa2e7094acd7304d64280#diff-4597d93a0e951f7199697dba7dd0dc32 |  
    spark.streaming.kafka.consumer.cache.maxCapacity | 2.0.1 | SPARK-12177 | 3134f116a3565c3a299fa2e7094acd7304d64280#diff-4597d93a0e951f7199697dba7dd0dc32 |  
    spark.streaming.kafka.consumer.cache.loadFactor | 2.0.1 | SPARK-12177 | 3134f116a3565c3a299fa2e7094acd7304d64280#diff-4597d93a0e951f7199697dba7dd0dc32 |  
    spark.streaming.kafka.maxRatePerPartition | 1.3.0 | SPARK-4964 | a119cae48030520da9f26ee9a1270bed7f33031e#diff-26cb4369f86050dc2e75cd16291b2844 |  
    spark.streaming.kafka.minRatePerPartition | 2.4.0 | SPARK-25233 | 135ff16a3510a4dfb3470904004dae9848005019#diff-815f6ec5caf9e4beb355f5f981171f1f |  
    spark.streaming.kafka.allowNonConsecutiveOffsets | 2.3.1 | SPARK-24067 | 1d598b771de3b588a2f377ae7ccf8193156641f2#diff-4597d93a0e951f7199697dba7dd0dc32 |  
    spark.kafka.producer.cache.timeout | 2.2.1 | SPARK-19968 | f6730a70cb47ebb3df7f42209df7b076aece1093#diff-ac8844e8d791a75aaee3d0d10bfc1f2a |  
    spark.kafka.producer.cache.evictorThreadRunInterval | 3.0.0 | SPARK-21869 | 7bff2db9ed803e05a43c2d875c1dea819d81248a#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd |  
    spark.kafka.consumer.cache.capacity | 3.0.0 | SPARK-27687 | efa303581ac61d6f517aacd08883da2d01530bd2#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd |  
    spark.kafka.consumer.cache.jmx.enable | 3.0.0 | SPARK-25151 | 594c9c5a3ece0e913949c7160bb4925e5d289e44#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd |  
    spark.kafka.consumer.cache.timeout | 3.0.0 | SPARK-25151 | 594c9c5a3ece0e913949c7160bb4925e5d289e44#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd |  
    spark.kafka.consumer.cache.evictorThreadRunInterval | 3.0.0 | SPARK-25151 | 594c9c5a3ece0e913949c7160bb4925e5d289e44#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd |  
    spark.kafka.consumer.fetchedData.cache.timeout | 3.0.0 | SPARK-25151 | 594c9c5a3ece0e913949c7160bb4925e5d289e44#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd |  
    spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval | 3.0.0 | SPARK-25151 | 594c9c5a3ece0e913949c7160bb4925e5d289e44#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd |  
    spark.kafka.clusters.${cluster}.auth.bootstrap.servers | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 |  
    spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 |  
    spark.kafka.clusters.${cluster}.security.protocol | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 |  
    spark.kafka.clusters.${cluster}.sasl.kerberos.service.name | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 |  
    spark.kafka.clusters.${cluster}.ssl.truststore.location | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 |  
    spark.kafka.clusters.${cluster}.ssl.truststore.password | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 |  
    spark.kafka.clusters.${cluster}.ssl.keystore.location | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 |  
    spark.kafka.clusters.${cluster}.ssl.keystore.password | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 |  
    spark.kafka.clusters.${cluster}.ssl.key.password | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 |  
    spark.kafka.clusters.${cluster}.sasl.token.mechanism | 3.0.0 | SPARK-27294 | 2f558094257c38d26650049f2ac93be6d65d6d85#diff-7df71bd47f5a3428ebdb05ced3c31f49 |  
    
    ### Why are the changes needed?
    Supplemental configuration version information.
    
    ### Does this PR introduce any user-facing change?
    'No'.
    
    ### How was this patch tested?
    Exists UT
    
    Closes #27989 from beliefer/add-version-to-kafka-config.
    
    Authored-by: beliefer <be...@163.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 docs/configuration.md                              |  2 ++
 docs/structured-streaming-kafka-integration.md     | 26 +++++++++++++---
 .../org/apache/spark/sql/kafka010/package.scala    |  8 +++++
 .../apache/spark/streaming/kafka010/package.scala  | 36 +++++++++++++---------
 4 files changed, 54 insertions(+), 18 deletions(-)

diff --git a/docs/configuration.md b/docs/configuration.md
index 9e5f5b6..a7a1477 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2661,6 +2661,7 @@ Spark subsystems.
     <a href="streaming-kafka-0-10-integration.html">Kafka Integration guide</a>
     for more details.
   </td>
+  <td>1.3.0</td>
 </tr>
 <tr>
   <td><code>spark.streaming.kafka.minRatePerPartition</code></td>
@@ -2669,6 +2670,7 @@ Spark subsystems.
     Minimum rate (number of records per second) at which data will be read from each Kafka
     partition when using the new Kafka direct stream API.
   </td>
+  <td>2.4.0</td>
 </tr>
 <tr>
   <td><code>spark.streaming.ui.retainedBatches</code></td>
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index a1eeee5..016faa7 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -525,21 +525,24 @@ The caching key is built up from the following information:
 The following properties are available to configure the consumer pool:
 
 <table class="table">
-<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
 <tr>
   <td>spark.kafka.consumer.cache.capacity</td>
   <td>The maximum number of consumers cached. Please note that it's a soft limit.</td>
   <td>64</td>
+  <td>3.0.0</td>
 </tr>
 <tr>
   <td>spark.kafka.consumer.cache.timeout</td>
   <td>The minimum amount of time a consumer may sit idle in the pool before it is eligible for eviction by the evictor.</td>
   <td>5m (5 minutes)</td>
+  <td>3.0.0</td>
 </tr>
 <tr>
   <td>spark.kafka.consumer.cache.evictorThreadRunInterval</td>
   <td>The interval of time between runs of the idle evictor thread for consumer pool. When non-positive, no idle evictor thread will be run.</td>
   <td>1m (1 minute)</td>
+  <td>3.0.0</td>
 </tr>
 <tr>
   <td>spark.kafka.consumer.cache.jmx.enable</td>
@@ -547,6 +550,7 @@ The following properties are available to configure the consumer pool:
   The prefix of JMX name is set to "kafka010-cached-simple-kafka-consumer-pool".
   </td>
   <td>false</td>
+  <td>3.0.0</td>
 </tr>
 </table>
 
@@ -571,16 +575,18 @@ Note that it doesn't leverage Apache Commons Pool due to the difference of chara
 The following properties are available to configure the fetched data pool:
 
 <table class="table">
-<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
 <tr>
   <td>spark.kafka.consumer.fetchedData.cache.timeout</td>
   <td>The minimum amount of time a fetched data may sit idle in the pool before it is eligible for eviction by the evictor.</td>
   <td>5m (5 minutes)</td>
+  <td>3.0.0</td>
 </tr>
 <tr>
   <td>spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval</td>
   <td>The interval of time between runs of the idle evictor thread for fetched data pool. When non-positive, no idle evictor thread will be run.</td>
   <td>1m (1 minute)</td>
+  <td>3.0.0</td>
 </tr>
 </table>
 
@@ -816,16 +822,18 @@ It will use different Kafka producer when delegation token is renewed; Kafka pro
 The following properties are available to configure the producer pool:
 
 <table class="table">
-<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
 <tr>
   <td>spark.kafka.producer.cache.timeout</td>
   <td>The minimum amount of time a producer may sit idle in the pool before it is eligible for eviction by the evictor.</td>
   <td>10m (10 minutes)</td>
+  <td>2.2.1</td>
 </tr>
 <tr>
   <td>spark.kafka.producer.cache.evictorThreadRunInterval</td>
   <td>The interval of time between runs of the idle evictor thread for producer pool. When non-positive, no idle evictor thread will be run.</td>
   <td>1m (1 minute)</td>
+  <td>3.0.0</td>
 </tr>
 </table>
 
@@ -935,7 +943,7 @@ When none of the above applies then unsecure connection assumed.
 Delegation tokens can be obtained from multiple clusters and <code>${cluster}</code> is an arbitrary unique identifier which helps to group different configurations.
 
 <table class="table">
-<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
   <tr>
     <td><code>spark.kafka.clusters.${cluster}.auth.bootstrap.servers</code></td>
     <td>None</td>
@@ -943,6 +951,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
       A list of coma separated host/port pairs to use for establishing the initial connection
       to the Kafka cluster. For further details please see Kafka documentation. Only used to obtain delegation token.
     </td>
+    <td>3.0.0</td>
   </tr>
   <tr>
     <td><code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code></td>
@@ -953,6 +962,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
       If multiple clusters match the address, an exception will be thrown and the query won't be started.
       Kafka's secure and unsecure listeners are bound to different ports. When both used the secure listener port has to be part of the regular expression.
     </td>
+    <td>3.0.0</td>
   </tr>
   <tr>
     <td><code>spark.kafka.clusters.${cluster}.security.protocol</code></td>
@@ -962,6 +972,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
       <code>bootstrap.servers</code> config matches (for further details please see <code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code>),
       and can be overridden by setting <code>kafka.security.protocol</code> on the source or sink.
     </td>
+    <td>3.0.0</td>
   </tr>
   <tr>
     <td><code>spark.kafka.clusters.${cluster}.sasl.kerberos.service.name</code></td>
@@ -970,6 +981,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
       The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config.
       For further details please see Kafka documentation. Only used to obtain delegation token.
     </td>
+    <td>3.0.0</td>
   </tr>
   <tr>
     <td><code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code></td>
@@ -977,6 +989,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
     <td>
       The location of the trust store file. For further details please see Kafka documentation. Only used to obtain delegation token.
     </td>
+    <td>3.0.0</td>
   </tr>
   <tr>
     <td><code>spark.kafka.clusters.${cluster}.ssl.truststore.password</code></td>
@@ -985,6 +998,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
       The store password for the trust store file. This is optional and only needed if <code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code> is configured.
       For further details please see Kafka documentation. Only used to obtain delegation token.
     </td>
+    <td>3.0.0</td>
   </tr>
   <tr>
     <td><code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code></td>
@@ -993,6 +1007,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
       The location of the key store file. This is optional for client and can be used for two-way authentication for client.
       For further details please see Kafka documentation. Only used to obtain delegation token.
     </td>
+    <td>3.0.0</td>
   </tr>
   <tr>
     <td><code>spark.kafka.clusters.${cluster}.ssl.keystore.password</code></td>
@@ -1001,6 +1016,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
       The store password for the key store file. This is optional and only needed if <code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code> is configured.
       For further details please see Kafka documentation. Only used to obtain delegation token.
     </td>
+    <td>3.0.0</td>
   </tr>
   <tr>
     <td><code>spark.kafka.clusters.${cluster}.ssl.key.password</code></td>
@@ -1009,6 +1025,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
       The password of the private key in the key store file. This is optional for client.
       For further details please see Kafka documentation. Only used to obtain delegation token.
     </td>
+    <td>3.0.0</td>
   </tr>
   <tr>
     <td><code>spark.kafka.clusters.${cluster}.sasl.token.mechanism</code></td>
@@ -1017,6 +1034,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
       SASL mechanism used for client connections with delegation token. Because SCRAM login module used for authentication a compatible mechanism has to be set here.
       For further details please see Kafka documentation (<code>sasl.mechanism</code>). Only used to authenticate against Kafka broker with delegation token.
     </td>
+    <td>3.0.0</td>
   </tr>
 </table>
 
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
index 460bb8b..b7e42c0 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
@@ -29,6 +29,7 @@ package object kafka010 {   // scalastyle:ignore
   private[kafka010] val PRODUCER_CACHE_TIMEOUT =
     ConfigBuilder("spark.kafka.producer.cache.timeout")
       .doc("The expire time to remove the unused producers.")
+      .version("2.2.1")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("10m")
 
@@ -36,6 +37,7 @@ package object kafka010 {   // scalastyle:ignore
     ConfigBuilder("spark.kafka.producer.cache.evictorThreadRunInterval")
       .doc("The interval of time between runs of the idle evictor thread for producer pool. " +
         "When non-positive, no idle evictor thread will be run.")
+      .version("3.0.0")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("1m")
 
@@ -43,12 +45,14 @@ package object kafka010 {   // scalastyle:ignore
     ConfigBuilder("spark.kafka.consumer.cache.capacity")
       .doc("The maximum number of consumers cached. Please note it's a soft limit" +
         " (check Structured Streaming Kafka integration guide for further details).")
+      .version("3.0.0")
       .intConf
       .createWithDefault(64)
 
   private[kafka010] val CONSUMER_CACHE_JMX_ENABLED =
     ConfigBuilder("spark.kafka.consumer.cache.jmx.enable")
       .doc("Enable or disable JMX for pools created with this configuration instance.")
+      .version("3.0.0")
       .booleanConf
       .createWithDefault(false)
 
@@ -57,6 +61,7 @@ package object kafka010 {   // scalastyle:ignore
       .doc("The minimum amount of time a consumer may sit idle in the pool before " +
         "it is eligible for eviction by the evictor. " +
         "When non-positive, no consumers will be evicted from the pool due to idle time alone.")
+      .version("3.0.0")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("5m")
 
@@ -64,6 +69,7 @@ package object kafka010 {   // scalastyle:ignore
     ConfigBuilder("spark.kafka.consumer.cache.evictorThreadRunInterval")
       .doc("The interval of time between runs of the idle evictor thread for consumer pool. " +
         "When non-positive, no idle evictor thread will be run.")
+      .version("3.0.0")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("1m")
 
@@ -72,6 +78,7 @@ package object kafka010 {   // scalastyle:ignore
       .doc("The minimum amount of time a fetched data may sit idle in the pool before " +
         "it is eligible for eviction by the evictor. " +
         "When non-positive, no fetched data will be evicted from the pool due to idle time alone.")
+      .version("3.0.0")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("5m")
 
@@ -79,6 +86,7 @@ package object kafka010 {   // scalastyle:ignore
     ConfigBuilder("spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval")
       .doc("The interval of time between runs of the idle evictor thread for fetched data pool. " +
         "When non-positive, no idle evictor thread will be run.")
+      .version("3.0.0")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("1m")
 }
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
index 3d2921f..0679a49 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
@@ -26,43 +26,51 @@ package object kafka010 { //scalastyle:ignore
 
   private[spark] val CONSUMER_CACHE_ENABLED =
     ConfigBuilder("spark.streaming.kafka.consumer.cache.enabled")
+      .version("2.2.1")
       .booleanConf
       .createWithDefault(true)
 
   private[spark] val CONSUMER_POLL_MS =
     ConfigBuilder("spark.streaming.kafka.consumer.poll.ms")
-    .longConf
-    .createOptional
+      .version("2.0.1")
+      .longConf
+      .createOptional
 
   private[spark] val CONSUMER_CACHE_INITIAL_CAPACITY =
     ConfigBuilder("spark.streaming.kafka.consumer.cache.initialCapacity")
-    .intConf
-    .createWithDefault(16)
+      .version("2.0.1")
+      .intConf
+      .createWithDefault(16)
 
   private[spark] val CONSUMER_CACHE_MAX_CAPACITY =
     ConfigBuilder("spark.streaming.kafka.consumer.cache.maxCapacity")
-    .intConf
-    .createWithDefault(64)
+      .version("2.0.1")
+      .intConf
+      .createWithDefault(64)
 
   private[spark] val CONSUMER_CACHE_LOAD_FACTOR =
     ConfigBuilder("spark.streaming.kafka.consumer.cache.loadFactor")
-    .doubleConf
-    .createWithDefault(0.75)
+      .version("2.0.1")
+      .doubleConf
+      .createWithDefault(0.75)
 
   private[spark] val MAX_RATE_PER_PARTITION =
     ConfigBuilder("spark.streaming.kafka.maxRatePerPartition")
-    .longConf
-    .createWithDefault(0)
+      .version("1.3.0")
+      .longConf
+      .createWithDefault(0)
 
   private[spark] val MIN_RATE_PER_PARTITION =
     ConfigBuilder("spark.streaming.kafka.minRatePerPartition")
-    .longConf
-    .createWithDefault(1)
+      .version("2.4.0")
+      .longConf
+      .createWithDefault(1)
 
   private[spark] val ALLOW_NON_CONSECUTIVE_OFFSETS =
     ConfigBuilder("spark.streaming.kafka.allowNonConsecutiveOffsets")
-    .booleanConf
-    .createWithDefault(false)
+      .version("2.3.1")
+      .booleanConf
+      .createWithDefault(false)
 
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org