You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/04/24 21:33:57 UTC

spark git commit: [SPARK-24056][SS] Make consumer creation lazy in Kafka source for Structured streaming

Repository: spark
Updated Branches:
  refs/heads/master 379bffa05 -> 7b1e6523a


[SPARK-24056][SS] Make consumer creation lazy in Kafka source for Structured streaming

## What changes were proposed in this pull request?

Currently, the driver side of the Kafka source (i.e. KafkaMicroBatchReader) eagerly creates a consumer as soon as the Kafk aMicroBatchReader is created. However, we create dummy KafkaMicroBatchReader to get the schema and immediately stop it. Its better to make the consumer creation lazy, it will be created on the first attempt to fetch offsets using the KafkaOffsetReader.

## How was this patch tested?
Existing unit tests

Author: Tathagata Das <ta...@gmail.com>

Closes #21134 from tdas/SPARK-24056.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b1e6523
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b1e6523
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b1e6523

Branch: refs/heads/master
Commit: 7b1e6523af3c96043aa8d2763e5f18b6e2781c3d
Parents: 379bffa
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Apr 24 14:33:33 2018 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Apr 24 14:33:33 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/kafka010/KafkaOffsetReader.scala  | 31 +++++++++++---------
 1 file changed, 17 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7b1e6523/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index 551641c..8206669 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -75,7 +75,17 @@ private[kafka010] class KafkaOffsetReader(
    * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the
    * offsets and never commits them.
    */
-  protected var consumer = createConsumer()
+  @volatile protected var _consumer: Consumer[Array[Byte], Array[Byte]] = null
+
+  protected def consumer: Consumer[Array[Byte], Array[Byte]] = synchronized {
+    assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+    if (_consumer == null) {
+      val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
+      newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
+      _consumer = consumerStrategy.createConsumer(newKafkaParams)
+    }
+    _consumer
+  }
 
   private val maxOffsetFetchAttempts =
     readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt
@@ -95,9 +105,7 @@ private[kafka010] class KafkaOffsetReader(
    * Closes the connection to Kafka, and cleans up state.
    */
   def close(): Unit = {
-    runUninterruptibly {
-      consumer.close()
-    }
+    if (_consumer != null) runUninterruptibly { stopConsumer() }
     kafkaReaderThread.shutdown()
   }
 
@@ -304,19 +312,14 @@ private[kafka010] class KafkaOffsetReader(
     }
   }
 
-  /**
-   * Create a consumer using the new generated group id. We always use a new consumer to avoid
-   * just using a broken consumer to retry on Kafka errors, which likely will fail again.
-   */
-  private def createConsumer(): Consumer[Array[Byte], Array[Byte]] = synchronized {
-    val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
-    newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
-    consumerStrategy.createConsumer(newKafkaParams)
+  private def stopConsumer(): Unit = synchronized {
+    assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+    if (_consumer != null) _consumer.close()
   }
 
   private def resetConsumer(): Unit = synchronized {
-    consumer.close()
-    consumer = createConsumer()
+    stopConsumer()
+    _consumer = null  // will automatically get reinitialized again
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org