You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/07/02 12:38:31 UTC
[spark] branch master updated: [SPARK-28232][SS][SQL] Add
groupIdPrefix for Kafka batch connector
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a006c85 [SPARK-28232][SS][SQL] Add groupIdPrefix for Kafka batch connector
a006c85 is described below
commit a006c85077d055144ef6d2a6d878dcc8b79b65c0
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Tue Jul 2 20:37:52 2019 +0800
[SPARK-28232][SS][SQL] Add groupIdPrefix for Kafka batch connector
## What changes were proposed in this pull request?
According to the documentation `groupIdPrefix` should be available for `streaming and batch`.
It is not the case because the batch part is missing.
In this PR I've added:
* Structured Streaming test for v1 and v2 to cover `groupIdPrefix`
* Batch test for v1 and v2 to cover `groupIdPrefix`
* Added `groupIdPrefix` usage in batch
## How was this patch tested?
Additional + existing unit tests.
Closes #25030 from gaborgsomogyi/SPARK-28232.
Authored-by: Gabor Somogyi <ga...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/sql/kafka010/KafkaBatch.scala | 2 +-
.../apache/spark/sql/kafka010/KafkaRelation.scala | 2 +-
.../spark/sql/kafka010/KafkaSourceProvider.scala | 6 ++++--
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 22 ++++++++++++++++----
.../spark/sql/kafka010/KafkaRelationSuite.scala | 24 +++++++++++++++++-----
5 files changed, 43 insertions(+), 13 deletions(-)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
index 6844905..e3c8536 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
@@ -47,7 +47,7 @@ private[kafka010] class KafkaBatch(
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
- val uniqueGroupId = KafkaSourceProvider.batchUniqueGroupId()
+ val uniqueGroupId = KafkaSourceProvider.batchUniqueGroupId(sourceOptions)
val kafkaOffsetReader = new KafkaOffsetReader(
strategy,
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
index 495b3d2..b2950cf 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
@@ -55,7 +55,7 @@ private[kafka010] class KafkaRelation(
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
- val uniqueGroupId = KafkaSourceProvider.batchUniqueGroupId()
+ val uniqueGroupId = KafkaSourceProvider.batchUniqueGroupId(sourceOptions)
val kafkaOffsetReader = new KafkaOffsetReader(
strategy,
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index c2f3170..a3ea918 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -590,8 +590,10 @@ private[kafka010] object KafkaSourceProvider extends Logging {
* Returns a unique batch consumer group (group.id), allowing the user to set the prefix of
* the consumer group
*/
- private[kafka010] def batchUniqueGroupId(): String = {
- s"spark-kafka-relation-${UUID.randomUUID}"
+ private[kafka010] def batchUniqueGroupId(parameters: Map[String, String]): String = {
+ val groupIdPrefix = parameters
+ .getOrElse(GROUP_ID_PREFIX, "spark-kafka-relation")
+ s"${groupIdPrefix}-${UUID.randomUUID}"
}
/**
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 4adee49..3d14ebe 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -660,7 +660,23 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
)
}
+ test("allow group.id prefix") {
+ testGroupId("groupIdPrefix", (expected, actual) => {
+ assert(actual.exists(_.startsWith(expected)) && !actual.exists(_ === expected),
+ "Valid consumer groups don't contain the expected group id - " +
+ s"Valid consumer groups: $actual / expected group id: $expected")
+ })
+ }
+
test("allow group.id override") {
+ testGroupId("kafka.group.id", (expected, actual) => {
+ assert(actual.exists(_ === expected), "Valid consumer groups don't " +
+ s"contain the expected group id - Valid consumer groups: $actual / " +
+ s"expected group id: $expected")
+ })
+ }
+
+ private def testGroupId(groupIdKey: String, validateGroupId: (String, Iterable[String]) => Unit) {
// Tests code path KafkaSourceProvider.{sourceSchema(.), createSource(.)}
// as well as KafkaOffsetReader.createConsumer(.)
val topic = newTopic()
@@ -673,7 +689,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
val dsKafka = spark
.readStream
.format("kafka")
- .option("kafka.group.id", customGroupId)
+ .option(groupIdKey, customGroupId)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
@@ -689,9 +705,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
val consumerGroups = testUtils.listConsumerGroups()
val validGroups = consumerGroups.valid().get()
val validGroupsId = validGroups.asScala.map(_.groupId())
- assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " +
- s"contain the expected group id - Valid consumer groups: $validGroupsId / " +
- s"expected group id: $customGroupId")
+ validateGroupId(customGroupId, validGroupsId)
}
)
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
index a49985c..84d1ab6 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
@@ -247,7 +247,23 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSQLContext wi
testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
}
- test("allow group.id overriding") {
+ test("allow group.id prefix") {
+ testGroupId("groupIdPrefix", (expected, actual) => {
+ assert(actual.exists(_.startsWith(expected)) && !actual.exists(_ === expected),
+ "Valid consumer groups don't contain the expected group id - " +
+ s"Valid consumer groups: $actual / expected group id: $expected")
+ })
+ }
+
+ test("allow group.id override") {
+ testGroupId("kafka.group.id", (expected, actual) => {
+ assert(actual.exists(_ === expected), "Valid consumer groups don't " +
+ s"contain the expected group id - Valid consumer groups: $actual / " +
+ s"expected group id: $expected")
+ })
+ }
+
+ private def testGroupId(groupIdKey: String, validateGroupId: (String, Iterable[String]) => Unit) {
// Tests code path KafkaSourceProvider.createRelation(.)
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
@@ -256,15 +272,13 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSQLContext wi
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
val customGroupId = "id-" + Random.nextInt()
- val df = createDF(topic, withOptions = Map("kafka.group.id" -> customGroupId))
+ val df = createDF(topic, withOptions = Map(groupIdKey -> customGroupId))
checkAnswer(df, (1 to 30).map(_.toString).toDF())
val consumerGroups = testUtils.listConsumerGroups()
val validGroups = consumerGroups.valid().get()
val validGroupsId = validGroups.asScala.map(_.groupId())
- assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " +
- s"contain the expected group id - Valid consumer groups: $validGroupsId / " +
- s"expected group id: $customGroupId")
+ validateGroupId(customGroupId, validGroupsId)
}
test("read Kafka transactional messages: read_committed") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org