You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ko...@apache.org on 2018/04/18 02:09:11 UTC

spark git commit: [SPARK-22968][DSTREAM] Throw an exception on partition revoking issue

Repository: spark
Updated Branches:
  refs/heads/master 1ca3c50fe -> 5fccdae18


[SPARK-22968][DSTREAM] Throw an exception on partition revoking issue

## What changes were proposed in this pull request?

Kafka partitions can be revoked when new consumers joined in the consumer group to rebalance the partitions. But current Spark Kafka connector code makes sure there's no partition revoking scenarios, so trying to get latest offset from revoked partitions will throw exceptions as JIRA mentioned.

Partition revoking happens when new consumer joined the consumer group, which means different streaming apps are trying to use same group id. This is fundamentally not correct, different apps should use different consumer group. So instead of throwing an confused exception from Kafka, improve the exception message by identifying revoked partition and directly throw an meaningful exception when partition is revoked.

Besides, this PR also fixes bugs in `DirectKafkaWordCount`, this example simply cannot be worked without the fix.

```
8/01/05 09:48:27 INFO internals.ConsumerCoordinator: Revoking previously assigned partitions [kssh-7, kssh-4, kssh-3, kssh-6, kssh-5, kssh-0, kssh-2, kssh-1] for group use_a_separate_group_id_for_each_stream
18/01/05 09:48:27 INFO internals.AbstractCoordinator: (Re-)joining group use_a_separate_group_id_for_each_stream
18/01/05 09:48:27 INFO internals.AbstractCoordinator: Successfully joined group use_a_separate_group_id_for_each_stream with generation 4
18/01/05 09:48:27 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [kssh-7, kssh-4, kssh-6, kssh-5] for group use_a_separate_group_id_for_each_stream
```

## How was this patch tested?

This is manually verified in local cluster, unfortunately I'm not sure how to simulate it in UT, so propose the PR without UT added.

Author: jerryshao <ss...@hortonworks.com>

Closes #21038 from jerryshao/SPARK-22968.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5fccdae1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5fccdae1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5fccdae1

Branch: refs/heads/master
Commit: 5fccdae18911793967b315c02c058eb737e46174
Parents: 1ca3c50
Author: jerryshao <ss...@hortonworks.com>
Authored: Tue Apr 17 21:08:42 2018 -0500
Committer: cody koeninger <co...@koeninger.org>
Committed: Tue Apr 17 21:08:42 2018 -0500

----------------------------------------------------------------------
 .../examples/streaming/DirectKafkaWordCount.scala  | 17 +++++++++++++----
 .../kafka010/DirectKafkaInputDStream.scala         | 12 ++++++++++++
 2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5fccdae1/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
index def0602..2082fb7 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
@@ -18,6 +18,9 @@
 // scalastyle:off println
 package org.apache.spark.examples.streaming
 
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.StringDeserializer
+
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.kafka010._
@@ -26,18 +29,20 @@ import org.apache.spark.streaming.kafka010._
  * Consumes messages from one or more topics in Kafka and does wordcount.
  * Usage: DirectKafkaWordCount <brokers> <topics>
  *   <brokers> is a list of one or more Kafka brokers
+ *   <groupId> is a consumer group name to consume from topics
  *   <topics> is a list of one or more kafka topics to consume from
  *
  * Example:
  *    $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \
- *    topic1,topic2
+ *    consumer-group topic1,topic2
  */
 object DirectKafkaWordCount {
   def main(args: Array[String]) {
-    if (args.length < 2) {
+    if (args.length < 3) {
       System.err.println(s"""
         |Usage: DirectKafkaWordCount <brokers> <topics>
         |  <brokers> is a list of one or more Kafka brokers
+        |  <groupId> is a consumer group name to consume from topics
         |  <topics> is a list of one or more kafka topics to consume from
         |
         """.stripMargin)
@@ -46,7 +51,7 @@ object DirectKafkaWordCount {
 
     StreamingExamples.setStreamingLogLevels()
 
-    val Array(brokers, topics) = args
+    val Array(brokers, groupId, topics) = args
 
     // Create context with 2 second batch interval
     val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
@@ -54,7 +59,11 @@ object DirectKafkaWordCount {
 
     // Create direct kafka stream with brokers and topics
     val topicsSet = topics.split(",").toSet
-    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
+    val kafkaParams = Map[String, Object](
+      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
+      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
+      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
+      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
     val messages = KafkaUtils.createDirectStream[String, String](
       ssc,
       LocationStrategies.PreferConsistent,

http://git-wip-us.apache.org/repos/asf/spark/blob/5fccdae1/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 215b7ca..c322148 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -190,8 +190,20 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
     // make sure new partitions are reflected in currentOffsets
     val newPartitions = parts.diff(currentOffsets.keySet)
+
+    // Check if there's any partition been revoked because of consumer rebalance.
+    val revokedPartitions = currentOffsets.keySet.diff(parts)
+    if (revokedPartitions.nonEmpty) {
+      throw new IllegalStateException(s"Previously tracked partitions " +
+        s"${revokedPartitions.mkString("[", ",", "]")} been revoked by Kafka because of consumer " +
+        s"rebalance. This is mostly due to another stream with same group id joined, " +
+        s"please check if there're different streaming application misconfigure to use same " +
+        s"group id. Fundamentally different stream should use different group id")
+    }
+
     // position for new partitions determined by auto.offset.reset if no commit
     currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
+
     // don't want to consume messages, so pause
     c.pause(newPartitions.asJava)
     // find latest available offsets


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org