You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/02/13 07:00:25 UTC
spark git commit: [SPARK-19564][SPARK-19559][SS][KAFKA]
KafkaOffsetReader's consumers should not be in the same group
Repository: spark
Updated Branches:
refs/heads/master bc0a0e639 -> 2bdbc8705
[SPARK-19564][SPARK-19559][SS][KAFKA] KafkaOffsetReader's consumers should not be in the same group
## What changes were proposed in this pull request?
In `KafkaOffsetReader`, when error occurs, we abort the existing consumer and create a new consumer. In our current implementation, the first consumer and the second consumer would be in the same group (which leads to SPARK-19559), **_violating our intention of the two consumers not being in the same group._**
The cause is that, in our current implementation, the first consumer is created before `groupId` and `nextId` are initialized in the constructor. Then even if `groupId` and `nextId` are increased during the creation of that first consumer, `groupId` and `nextId` would still be initialized to default values in the constructor for the second consumer.
We should make sure that `groupId` and `nextId` are initialized before any consumer is created.
## How was this patch tested?
Ran 100 times of `KafkaSourceSuite`; all passed
Author: Liwei Lin <lw...@gmail.com>
Closes #16902 from lw-lin/SPARK-19564-.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bdbc870
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bdbc870
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bdbc870
Branch: refs/heads/master
Commit: 2bdbc87052389ff69404347fbc69457132dbcafd
Parents: bc0a0e6
Author: Liwei Lin <lw...@gmail.com>
Authored: Sun Feb 12 23:00:22 2017 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Sun Feb 12 23:00:22 2017 -0800
----------------------------------------------------------------------
.../apache/spark/sql/kafka010/KafkaOffsetReader.scala | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2bdbc870/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index 6b2fb3c..2696d6f 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -65,6 +65,13 @@ private[kafka010] class KafkaOffsetReader(
val execContext = ExecutionContext.fromExecutorService(kafkaReaderThread)
/**
+ * Place [[groupId]] and [[nextId]] here so that they are initialized before any consumer is
+ * created -- see SPARK-19564.
+ */
+ private var groupId: String = null
+ private var nextId = 0
+
+ /**
* A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the
* offsets and never commits them.
*/
@@ -76,10 +83,6 @@ private[kafka010] class KafkaOffsetReader(
private val offsetFetchAttemptIntervalMs =
readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
- private var groupId: String = null
-
- private var nextId = 0
-
private def nextGroupId(): String = {
groupId = driverGroupIdPrefix + "-" + nextId
nextId += 1
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org