You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/05/15 17:42:32 UTC
[spark] branch master updated: [SPARK-27687][SS] Rename Kafka
consumer cache capacity conf and document caching
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new efa3035 [SPARK-27687][SS] Rename Kafka consumer cache capacity conf and document caching
efa3035 is described below
commit efa303581ac61d6f517aacd08883da2d01530bd2
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Wed May 15 10:42:09 2019 -0700
[SPARK-27687][SS] Rename Kafka consumer cache capacity conf and document caching
## What changes were proposed in this pull request?
Kafka related Spark parameters has to start with `spark.kafka.` and not with `spark.sql.`. Because of this I've renamed `spark.sql.kafkaConsumerCache.capacity`.
Since Kafka consumer caching is not documented I've added this also.
## How was this patch tested?
Existing + added unit test.
```
cd docs
SKIP_API=1 jekyll build
```
and manual webpage check.
Closes #24590 from gaborgsomogyi/SPARK-27687.
Authored-by: Gabor Somogyi <ga...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../main/scala/org/apache/spark/SparkConf.scala | 4 +++-
docs/structured-streaming-kafka-integration.md | 18 ++++++++++++++
.../org/apache/spark/sql/kafka010/package.scala | 5 ++--
.../spark/sql/kafka010/KafkaSparkConfSuite.scala} | 28 +++++++---------------
4 files changed, 33 insertions(+), 22 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 240707e..aa93f42 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -714,7 +714,9 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0")),
KERBEROS_FILESYSTEMS_TO_ACCESS.key -> Seq(
AlternateConfig("spark.yarn.access.namenodes", "2.2"),
- AlternateConfig("spark.yarn.access.hadoopFileSystems", "3.0"))
+ AlternateConfig("spark.yarn.access.hadoopFileSystems", "3.0")),
+ "spark.kafka.consumer.cache.capacity" -> Seq(
+ AlternateConfig("spark.sql.kafkaConsumerCache.capacity", "3.0"))
)
/**
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index 4a295e0..bbff822 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -416,6 +416,24 @@ The following configurations are optional:
</tr>
</table>
+### Consumer Caching
+
+It's time-consuming to initialize Kafka consumers, especially in streaming scenarios where processing time is a key factor.
+Because of this, Spark caches Kafka consumers on executors. The caching key is built up from the following information:
+* Topic name
+* Topic partition
+* Group ID
+
+The size of the cache is limited by <code>spark.kafka.consumer.cache.capacity</code> (default: 64).
+If this threshold is reached, it tries to remove the least-used entry that is currently not in use.
+If it cannot be removed, then the cache will keep growing. In the worst case, the cache will grow to
+the max number of concurrent tasks that can run in the executor (that is, number of tasks slots),
+after which it will never reduce.
+
+If a task fails for any reason the new task is executed with a newly created Kafka consumer for safety reasons.
+At the same time the cached Kafka consumer which was used in the failed execution will be invalidated. Here it has to
+be emphasized it will not be closed if any other task is using it.
+
## Writing Data to Kafka
Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
index 115ec44..ff19862 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
@@ -33,8 +33,9 @@ package object kafka010 { // scalastyle:ignore
.createWithDefaultString("10m")
private[kafka010] val CONSUMER_CACHE_CAPACITY =
- ConfigBuilder("spark.sql.kafkaConsumerCache.capacity")
- .doc("The size of consumers cached.")
+ ConfigBuilder("spark.kafka.consumer.cache.capacity")
+ .doc("The maximum number of consumers cached. Please note it's a soft limit" +
+ " (check Structured Streaming Kafka integration guide for further details).")
.intConf
.createWithDefault(64)
}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSparkConfSuite.scala
similarity index 50%
copy from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
copy to external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSparkConfSuite.scala
index 115ec44..ca8b8b6 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSparkConfSuite.scala
@@ -14,27 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql
-import java.util.concurrent.TimeUnit
+package org.apache.spark.sql.kafka010
-import org.apache.kafka.common.TopicPartition
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.util.ResetSystemProperties
-import org.apache.spark.internal.config.ConfigBuilder
+class KafkaSparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties {
+ test("deprecated configs") {
+ val conf = new SparkConf()
-package object kafka010 { // scalastyle:ignore
- // ^^ scalastyle:ignore is for ignoring warnings about digits in package name
- type PartitionOffsetMap = Map[TopicPartition, Long]
-
- private[kafka010] val PRODUCER_CACHE_TIMEOUT =
- ConfigBuilder("spark.kafka.producer.cache.timeout")
- .doc("The expire time to remove the unused producers.")
- .timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString("10m")
-
- private[kafka010] val CONSUMER_CACHE_CAPACITY =
- ConfigBuilder("spark.sql.kafkaConsumerCache.capacity")
- .doc("The size of consumers cached.")
- .intConf
- .createWithDefault(64)
+ conf.set("spark.sql.kafkaConsumerCache.capacity", "32")
+ assert(conf.get(CONSUMER_CACHE_CAPACITY) === 32)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org