You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2019/01/15 22:21:59 UTC
[spark] branch master updated: [SPARK-26350][FOLLOWUP] Add actual
verification on new UT introduced on SPARK-26350
This is an automated email from the ASF dual-hosted git repository.
zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2ebb79b [SPARK-26350][FOLLOWUP] Add actual verification on new UT introduced on SPARK-26350
2ebb79b is described below
commit 2ebb79b2a607aa25ea22826d9c5d6af18c97a7f2
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Tue Jan 15 14:21:51 2019 -0800
[SPARK-26350][FOLLOWUP] Add actual verification on new UT introduced on SPARK-26350
## What changes were proposed in this pull request?
This patch adds the check to verify consumer group id is given correctly when custom group id is provided to Kafka parameter.
## How was this patch tested?
Modified UT.
Closes #23544 from HeartSaVioR/SPARK-26350-follow-up-actual-verification-on-UT.
Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
.../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 14 ++++++++++++--
.../org/apache/spark/sql/kafka010/KafkaRelationSuite.scala | 13 ++++++++++++-
.../org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 6 +++++-
3 files changed, 29 insertions(+), 4 deletions(-)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 6402088..cb45384 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -28,6 +28,7 @@ import scala.collection.JavaConverters._
import scala.io.Source
import scala.util.Random
+import org.apache.kafka.clients.admin.{AdminClient, ConsumerGroupListing}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -638,10 +639,11 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
+ val customGroupId = "id-" + Random.nextInt()
val dsKafka = spark
.readStream
.format("kafka")
- .option("kafka.group.id", "id-" + Random.nextInt())
+ .option("kafka.group.id", customGroupId)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
@@ -652,7 +654,15 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
testStream(dsKafka)(
makeSureGetOffsetCalled,
- CheckAnswer(1 to 30: _*)
+ CheckAnswer(1 to 30: _*),
+ Execute { _ =>
+ val consumerGroups = testUtils.listConsumerGroups()
+ val validGroups = consumerGroups.valid().get()
+ val validGroupsId = validGroups.asScala.map(_.groupId())
+ assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " +
+ s"contain the expected group id - Valid consumer groups: $validGroupsId / " +
+ s"expected group id: $customGroupId")
+ }
)
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
index efe7385..2cd13a9 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
@@ -20,6 +20,9 @@ package org.apache.spark.sql.kafka010
import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.JavaConverters._
+import scala.util.Random
+
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
@@ -247,8 +250,16 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
- val df = createDF(topic, withOptions = Map("kafka.group.id" -> "custom"))
+ val customGroupId = "id-" + Random.nextInt()
+ val df = createDF(topic, withOptions = Map("kafka.group.id" -> customGroupId))
checkAnswer(df, (1 to 30).map(_.toString).toDF())
+
+ val consumerGroups = testUtils.listConsumerGroups()
+ val validGroups = consumerGroups.valid().get()
+ val validGroupsId = validGroups.asScala.map(_.groupId())
+ assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " +
+ s"contain the expected group id - Valid consumer groups: $validGroupsId / " +
+ s"expected group id: $customGroupId")
}
test("read Kafka transactional messages: read_committed") {
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index bf6934b..dacfffa 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -33,7 +33,7 @@ import kafka.server.{KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions}
+import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
@@ -311,6 +311,10 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
offsets
}
+ def listConsumerGroups(): ListConsumerGroupsResult = {
+ adminClient.listConsumerGroups()
+ }
+
protected def brokerConfiguration: Properties = {
val props = new Properties()
props.put("broker.id", "0")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org