You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/07/02 12:38:31 UTC

[spark] branch master updated: [SPARK-28232][SS][SQL] Add groupIdPrefix for Kafka batch connector

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a006c85  [SPARK-28232][SS][SQL] Add groupIdPrefix for Kafka batch connector
a006c85 is described below

commit a006c85077d055144ef6d2a6d878dcc8b79b65c0
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Tue Jul 2 20:37:52 2019 +0800

    [SPARK-28232][SS][SQL] Add groupIdPrefix for Kafka batch connector
    
    ## What changes were proposed in this pull request?
    
    According to the documentation `groupIdPrefix` should be available for `streaming and batch`.
    It is not the case because the batch part is missing.
    
    In this PR I've added:
    * Structured Streaming test for v1 and v2 to cover `groupIdPrefix`
    * Batch test for v1 and v2 to cover `groupIdPrefix`
    * Added `groupIdPrefix` usage in batch
    
    ## How was this patch tested?
    
    Additional + existing unit tests.
    
    Closes #25030 from gaborgsomogyi/SPARK-28232.
    
    Authored-by: Gabor Somogyi <ga...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/kafka010/KafkaBatch.scala |  2 +-
 .../apache/spark/sql/kafka010/KafkaRelation.scala  |  2 +-
 .../spark/sql/kafka010/KafkaSourceProvider.scala   |  6 ++++--
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 22 ++++++++++++++++----
 .../spark/sql/kafka010/KafkaRelationSuite.scala    | 24 +++++++++++++++++-----
 5 files changed, 43 insertions(+), 13 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
index 6844905..e3c8536 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
@@ -47,7 +47,7 @@ private[kafka010] class KafkaBatch(
     // Each running query should use its own group id. Otherwise, the query may be only assigned
     // partial data since Kafka will assign partitions to multiple consumers having the same group
     // id. Hence, we should generate a unique id for each query.
-    val uniqueGroupId = KafkaSourceProvider.batchUniqueGroupId()
+    val uniqueGroupId = KafkaSourceProvider.batchUniqueGroupId(sourceOptions)
 
     val kafkaOffsetReader = new KafkaOffsetReader(
       strategy,
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
index 495b3d2..b2950cf 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
@@ -55,7 +55,7 @@ private[kafka010] class KafkaRelation(
     // Each running query should use its own group id. Otherwise, the query may be only assigned
     // partial data since Kafka will assign partitions to multiple consumers having the same group
     // id. Hence, we should generate a unique id for each query.
-    val uniqueGroupId = KafkaSourceProvider.batchUniqueGroupId()
+    val uniqueGroupId = KafkaSourceProvider.batchUniqueGroupId(sourceOptions)
 
     val kafkaOffsetReader = new KafkaOffsetReader(
       strategy,
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index c2f3170..a3ea918 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -590,8 +590,10 @@ private[kafka010] object KafkaSourceProvider extends Logging {
    * Returns a unique batch consumer group (group.id), allowing the user to set the prefix of
    * the consumer group
    */
-  private[kafka010] def batchUniqueGroupId(): String = {
-    s"spark-kafka-relation-${UUID.randomUUID}"
+  private[kafka010] def batchUniqueGroupId(parameters: Map[String, String]): String = {
+    val groupIdPrefix = parameters
+      .getOrElse(GROUP_ID_PREFIX, "spark-kafka-relation")
+    s"${groupIdPrefix}-${UUID.randomUUID}"
   }
 
   /**
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 4adee49..3d14ebe 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -660,7 +660,23 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
     )
   }
 
+  test("allow group.id prefix") {
+    testGroupId("groupIdPrefix", (expected, actual) => {
+      assert(actual.exists(_.startsWith(expected)) && !actual.exists(_ === expected),
+        "Valid consumer groups don't contain the expected group id - " +
+        s"Valid consumer groups: $actual / expected group id: $expected")
+    })
+  }
+
   test("allow group.id override") {
+    testGroupId("kafka.group.id", (expected, actual) => {
+      assert(actual.exists(_ === expected), "Valid consumer groups don't " +
+        s"contain the expected group id - Valid consumer groups: $actual / " +
+        s"expected group id: $expected")
+    })
+  }
+
+  private def testGroupId(groupIdKey: String, validateGroupId: (String, Iterable[String]) => Unit) {
     // Tests code path KafkaSourceProvider.{sourceSchema(.), createSource(.)}
     // as well as KafkaOffsetReader.createConsumer(.)
     val topic = newTopic()
@@ -673,7 +689,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
     val dsKafka = spark
       .readStream
       .format("kafka")
-      .option("kafka.group.id", customGroupId)
+      .option(groupIdKey, customGroupId)
       .option("kafka.bootstrap.servers", testUtils.brokerAddress)
       .option("subscribe", topic)
       .option("startingOffsets", "earliest")
@@ -689,9 +705,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
         val consumerGroups = testUtils.listConsumerGroups()
         val validGroups = consumerGroups.valid().get()
         val validGroupsId = validGroups.asScala.map(_.groupId())
-        assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " +
-          s"contain the expected group id - Valid consumer groups: $validGroupsId / " +
-          s"expected group id: $customGroupId")
+        validateGroupId(customGroupId, validGroupsId)
       }
     )
   }
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
index a49985c..84d1ab6 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
@@ -247,7 +247,23 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSQLContext wi
     testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
   }
 
-  test("allow group.id overriding") {
+  test("allow group.id prefix") {
+    testGroupId("groupIdPrefix", (expected, actual) => {
+      assert(actual.exists(_.startsWith(expected)) && !actual.exists(_ === expected),
+        "Valid consumer groups don't contain the expected group id - " +
+        s"Valid consumer groups: $actual / expected group id: $expected")
+    })
+  }
+
+  test("allow group.id override") {
+    testGroupId("kafka.group.id", (expected, actual) => {
+      assert(actual.exists(_ === expected), "Valid consumer groups don't " +
+        s"contain the expected group id - Valid consumer groups: $actual / " +
+        s"expected group id: $expected")
+    })
+  }
+
+  private def testGroupId(groupIdKey: String, validateGroupId: (String, Iterable[String]) => Unit) {
     // Tests code path KafkaSourceProvider.createRelation(.)
     val topic = newTopic()
     testUtils.createTopic(topic, partitions = 3)
@@ -256,15 +272,13 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSQLContext wi
     testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
 
     val customGroupId = "id-" + Random.nextInt()
-    val df = createDF(topic, withOptions = Map("kafka.group.id" -> customGroupId))
+    val df = createDF(topic, withOptions = Map(groupIdKey -> customGroupId))
     checkAnswer(df, (1 to 30).map(_.toString).toDF())
 
     val consumerGroups = testUtils.listConsumerGroups()
     val validGroups = consumerGroups.valid().get()
     val validGroupsId = validGroups.asScala.map(_.groupId())
-    assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " +
-      s"contain the expected group id - Valid consumer groups: $validGroupsId / " +
-      s"expected group id: $customGroupId")
+    validateGroupId(customGroupId, validGroupsId)
   }
 
   test("read Kafka transactional messages: read_committed") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org