You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/09/14 06:22:24 UTC

[GitHub] [spark] HeartSaVioR commented on a change in pull request #29729: [SPARK-32032][SS] Avoid infinite wait in driver because of KafkaConsumer.poll(long) API

HeartSaVioR commented on a change in pull request #29729:
URL: https://github.com/apache/spark/pull/29729#discussion_r487652330



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
##########
@@ -36,31 +37,37 @@ import org.apache.spark.kafka010.KafkaConfigUpdater
  * All three strategies have overloaded constructors that allow you to specify
  * the starting offset for a particular partition.
  */
-private[kafka010] sealed trait ConsumerStrategy {
-  /** Create a [[KafkaConsumer]] and subscribe to topics according to a desired strategy */
-  def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
-
-  /**
-   * Updates the parameters with security if needed.
-   * Added a function to hide internals and reduce code duplications because all strategy uses it.
-   */
-  protected def setAuthenticationConfigIfNeeded(kafkaParams: ju.Map[String, Object]) =
-    KafkaConfigUpdater("source", kafkaParams.asScala.toMap)
+private[kafka010] sealed trait ConsumerStrategy extends Logging {
+  /** Creates an [[org.apache.kafka.clients.admin.AdminClient]] */
+  def createAdmin(kafkaParams: ju.Map[String, Object]): Admin = {
+    val updatedKafkaParams = KafkaConfigUpdater("source", kafkaParams.asScala.toMap)
       .setAuthenticationConfigIfNeeded()
       .build()
+    logDebug(s"Admin params: ${KafkaRedactionUtil.redactParams(updatedKafkaParams.asScala.toSeq)}")
+    Admin.create(updatedKafkaParams)
+  }
+
+  /** Returns the assigned or subscribed [[TopicPartition]] */
+  def assignedTopicPartitions(admin: Admin): Set[TopicPartition]
 }

Review comment:
       Looks like we can extract the duplicated code among Strategies, which does retrieving all topic partitions via AdminClient.
   
   ```suggestion
   protected def retrieveAllPartitions(admin: Admin, topics: Set[String]): Set[TopicPartition] = {
       admin.describeTopics(topics.asJava).all().get().asScala.filterNot(_._2.isInternal).flatMap {
         case (topic, topicDescription) =>
           topicDescription.partitions().asScala.map { topicPartitionInfo =>
             val partition = topicPartitionInfo.partition()
             logDebug(s"Partition added: $topic:$partition")
             new TopicPartition(topic, partition)
           }
       }.toSet
     }
   }
   ```

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -46,41 +49,35 @@ import org.apache.spark.util.{UninterruptibleThread, UninterruptibleThreadRunner
 private[kafka010] class KafkaOffsetReader(
     consumerStrategy: ConsumerStrategy,
     val driverKafkaParams: ju.Map[String, Object],
-    readerOptions: CaseInsensitiveMap[String],
-    driverGroupIdPrefix: String) extends Logging {
+    readerOptions: CaseInsensitiveMap[String]) extends Logging {
 
   /**
-   * [[UninterruptibleThreadRunner]] ensures that all [[KafkaConsumer]] communication called in an
+   * [[UninterruptibleThreadRunner]] ensures that all Kafka communication called in an
    * [[UninterruptibleThread]]. In the case of streaming queries, we are already running in an
    * [[UninterruptibleThread]], however for batch mode this is not the case.
    */
   val uninterruptibleThreadRunner = new UninterruptibleThreadRunner("Kafka Offset Reader")
 
-  /**
-   * Place [[groupId]] and [[nextId]] here so that they are initialized before any consumer is
-   * created -- see SPARK-19564.
-   */
-  private var groupId: String = null
-  private var nextId = 0
-
-  /**
-   * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the
-   * offsets and never commits them.
-   */
-  @volatile protected var _consumer: Consumer[Array[Byte], Array[Byte]] = null
+  @volatile protected var _admin: Admin = null
 
-  protected def consumer: Consumer[Array[Byte], Array[Byte]] = synchronized {
+  protected def admin: Admin = synchronized {
     assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
-    if (_consumer == null) {
-      val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
-      if (driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
-        newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
-      }
-      _consumer = consumerStrategy.createConsumer(newKafkaParams)
+    if (_admin == null) {
+      _admin = consumerStrategy.createAdmin(driverKafkaParams)
     }
-    _consumer
+    _admin
   }
 
+  def isolationLevel(): IsolationLevel = {
+    driverKafkaParams.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG) match {

Review comment:
       It's a bit confusing as null can be assigned to String, and pattern matching is a Scala one where null is not a welcoming one in Scala. How about wrapping it as Option like below:
   
   ```suggestion
       Option(driverKafkaParams.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG)) match {
         case Some(s: String) => IsolationLevel.valueOf(s.toUpperCase(Locale.ROOT))
         case _ => IsolationLevel.valueOf(
           ConsumerConfig.DEFAULT_ISOLATION_LEVEL.toUpperCase(Locale.ROOT))
       }
   ```

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -46,41 +49,35 @@ import org.apache.spark.util.{UninterruptibleThread, UninterruptibleThreadRunner
 private[kafka010] class KafkaOffsetReader(
     consumerStrategy: ConsumerStrategy,
     val driverKafkaParams: ju.Map[String, Object],
-    readerOptions: CaseInsensitiveMap[String],
-    driverGroupIdPrefix: String) extends Logging {
+    readerOptions: CaseInsensitiveMap[String]) extends Logging {
 
   /**
-   * [[UninterruptibleThreadRunner]] ensures that all [[KafkaConsumer]] communication called in an
+   * [[UninterruptibleThreadRunner]] ensures that all Kafka communication called in an
    * [[UninterruptibleThread]]. In the case of streaming queries, we are already running in an
    * [[UninterruptibleThread]], however for batch mode this is not the case.
    */
   val uninterruptibleThreadRunner = new UninterruptibleThreadRunner("Kafka Offset Reader")
 
-  /**
-   * Place [[groupId]] and [[nextId]] here so that they are initialized before any consumer is
-   * created -- see SPARK-19564.
-   */
-  private var groupId: String = null
-  private var nextId = 0
-
-  /**
-   * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the
-   * offsets and never commits them.
-   */
-  @volatile protected var _consumer: Consumer[Array[Byte], Array[Byte]] = null
+  @volatile protected var _admin: Admin = null
 
-  protected def consumer: Consumer[Array[Byte], Array[Byte]] = synchronized {
+  protected def admin: Admin = synchronized {
     assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
-    if (_consumer == null) {
-      val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
-      if (driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
-        newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
-      }
-      _consumer = consumerStrategy.createConsumer(newKafkaParams)
+    if (_admin == null) {
+      _admin = consumerStrategy.createAdmin(driverKafkaParams)
     }
-    _consumer
+    _admin
   }
 
+  def isolationLevel(): IsolationLevel = {

Review comment:
       Unless we assume the driverKafkaParams can be changed, it can be just `val`, or `lazy val`.

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -33,10 +36,10 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.util.{UninterruptibleThread, UninterruptibleThreadRunner}
 
 /**
- * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets from Kafka.
+ * This class uses Kafka's its own [[Admin]] API to read data offsets from Kafka.

Review comment:
       nit: Sounds like `Kafka's` and `its` are same and redundant.

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -305,11 +277,12 @@ private[kafka010] class KafkaOffsetReader(
   def fetchLatestOffsets(

Review comment:
       Looks like all of workarounds applied here are related to the consumer (group) - it's no longer relevant given we change to use Admin. If we can agree to delete workaround for consumer (here and other places as well) we can reduce complexity heavily, but let's hear others' voices on this. 

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -105,34 +102,16 @@ private[kafka010] class KafkaOffsetReader(
     minPartitions.map(_ > numTopicPartitions).getOrElse(false)
   }
 
-  private def nextGroupId(): String = {
-    groupId = driverGroupIdPrefix + "-" + nextId
-    nextId += 1
-    groupId
-  }
-
   override def toString(): String = consumerStrategy.toString
 
   /**
    * Closes the connection to Kafka, and cleans up state.
    */
   def close(): Unit = {
-    if (_consumer != null) uninterruptibleThreadRunner.runUninterruptibly { stopConsumer() }
+    if (_admin != null) uninterruptibleThreadRunner.runUninterruptibly { stopAdmin() }

Review comment:
       It would be nice if we can get rid of that, as it brings complexity - better to double-check with Kafka community.

##########
File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -689,57 +691,6 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
     )
   }
 
-  test("allow group.id prefix") {

Review comment:
       Let's remove them from the doc as well. (There should be configuration as well as explanation around group id.)

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -213,64 +180,70 @@ private[kafka010] class KafkaOffsetReader(
       assert(partitions.asScala == partitionTimestamps.keySet,
         "If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " +
           s"topics. Specified: ${partitionTimestamps.keySet} Assigned: ${partitions.asScala}")
-      logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionTimestamps")
+      logDebug(s"Assigned partitions: $partitions. Seeking to $partitionTimestamps")
     }
 
     val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => {
-        val converted = partitionTimestamps.map { case (tp, timestamp) =>
-          tp -> java.lang.Long.valueOf(timestamp)
+        val listOffsetsParams = partitionTimestamps.map { p =>

Review comment:
       I think `case (tp, timestamp)` is better in point of readability.

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -213,64 +180,70 @@ private[kafka010] class KafkaOffsetReader(
       assert(partitions.asScala == partitionTimestamps.keySet,
         "If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " +
           s"topics. Specified: ${partitionTimestamps.keySet} Assigned: ${partitions.asScala}")
-      logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionTimestamps")
+      logDebug(s"Assigned partitions: $partitions. Seeking to $partitionTimestamps")
     }
 
     val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => {
-        val converted = partitionTimestamps.map { case (tp, timestamp) =>
-          tp -> java.lang.Long.valueOf(timestamp)
+        val listOffsetsParams = partitionTimestamps.map { p =>
+          p._1 -> OffsetSpec.forTimestamp(p._2)
         }.asJava
+        admin.listOffsets(listOffsetsParams, listOffsetsOptions()).all().get().asScala.map {
+          case (tp, offsetSpec) =>
+            if (failsOnNoMatchingOffset) {
+              assert(offsetSpec.offset() != OffsetFetchResponse.INVALID_OFFSET, "No offset " +
+                s"matched from request of topic-partition $tp and timestamp " +
+                s"${partitionTimestamps(tp)}.")
+            }
 
-        val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] =
-          consumer.offsetsForTimes(converted)
-
-        offsetForTime.asScala.map { case (tp, offsetAndTimestamp) =>
-          if (failsOnNoMatchingOffset) {
-            assert(offsetAndTimestamp != null, "No offset matched from request of " +
-              s"topic-partition $tp and timestamp ${partitionTimestamps(tp)}.")
-          }
-
-          if (offsetAndTimestamp == null) {
-            tp -> KafkaOffsetRangeLimit.LATEST
-          } else {
-            tp -> offsetAndTimestamp.offset()
-          }
+            if (offsetSpec == null) {

Review comment:
       Could you please elaborate when? I'd like to check thoughtfully on the change.

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -213,64 +180,70 @@ private[kafka010] class KafkaOffsetReader(
       assert(partitions.asScala == partitionTimestamps.keySet,
         "If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " +
           s"topics. Specified: ${partitionTimestamps.keySet} Assigned: ${partitions.asScala}")
-      logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionTimestamps")
+      logDebug(s"Assigned partitions: $partitions. Seeking to $partitionTimestamps")
     }
 
     val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => {
-        val converted = partitionTimestamps.map { case (tp, timestamp) =>
-          tp -> java.lang.Long.valueOf(timestamp)
+        val listOffsetsParams = partitionTimestamps.map { p =>
+          p._1 -> OffsetSpec.forTimestamp(p._2)
         }.asJava
+        admin.listOffsets(listOffsetsParams, listOffsetsOptions()).all().get().asScala.map {
+          case (tp, offsetSpec) =>
+            if (failsOnNoMatchingOffset) {
+              assert(offsetSpec.offset() != OffsetFetchResponse.INVALID_OFFSET, "No offset " +
+                s"matched from request of topic-partition $tp and timestamp " +
+                s"${partitionTimestamps(tp)}.")
+            }
 
-        val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] =
-          consumer.offsetsForTimes(converted)
-
-        offsetForTime.asScala.map { case (tp, offsetAndTimestamp) =>
-          if (failsOnNoMatchingOffset) {
-            assert(offsetAndTimestamp != null, "No offset matched from request of " +
-              s"topic-partition $tp and timestamp ${partitionTimestamps(tp)}.")
-          }
-
-          if (offsetAndTimestamp == null) {
-            tp -> KafkaOffsetRangeLimit.LATEST
-          } else {
-            tp -> offsetAndTimestamp.offset()
-          }
+            if (offsetSpec == null) {

Review comment:
       And if `offsetSpec` can be null, `offsetSpec.offset()` in below if statement will throw NPE.

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -447,7 +422,7 @@ private[kafka010] class KafkaOffsetReader(
     }
   }
 
-  private def getSortedExecutorList(): Array[String] = {
+  private def getSortedExecutorList: Array[String] = {

Review comment:
       This looks to be unnecessary change, though it doesn't look to have side effect so semantically OK.

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -305,11 +277,12 @@ private[kafka010] class KafkaOffsetReader(
   def fetchLatestOffsets(
       knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap =
     partitionsAssignedToConsumer { partitions => {
-      logDebug("Seeking to the end.")
-
       if (knownOffsets.isEmpty) {
-        consumer.seekToEnd(partitions)
-        partitions.asScala.map(p => p -> consumer.position(p)).toMap
+        val listOffsetsParams = partitions.asScala.map(_ -> OffsetSpec.latest()).toMap.asJava

Review comment:
       This can be defined earlier to deduplicate.

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -213,64 +180,70 @@ private[kafka010] class KafkaOffsetReader(
       assert(partitions.asScala == partitionTimestamps.keySet,
         "If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " +
           s"topics. Specified: ${partitionTimestamps.keySet} Assigned: ${partitions.asScala}")
-      logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionTimestamps")
+      logDebug(s"Assigned partitions: $partitions. Seeking to $partitionTimestamps")
     }
 
     val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => {
-        val converted = partitionTimestamps.map { case (tp, timestamp) =>
-          tp -> java.lang.Long.valueOf(timestamp)
+        val listOffsetsParams = partitionTimestamps.map { p =>
+          p._1 -> OffsetSpec.forTimestamp(p._2)
         }.asJava
+        admin.listOffsets(listOffsetsParams, listOffsetsOptions()).all().get().asScala.map {
+          case (tp, offsetSpec) =>
+            if (failsOnNoMatchingOffset) {
+              assert(offsetSpec.offset() != OffsetFetchResponse.INVALID_OFFSET, "No offset " +
+                s"matched from request of topic-partition $tp and timestamp " +
+                s"${partitionTimestamps(tp)}.")
+            }
 
-        val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] =
-          consumer.offsetsForTimes(converted)
-
-        offsetForTime.asScala.map { case (tp, offsetAndTimestamp) =>
-          if (failsOnNoMatchingOffset) {
-            assert(offsetAndTimestamp != null, "No offset matched from request of " +
-              s"topic-partition $tp and timestamp ${partitionTimestamps(tp)}.")
-          }
-
-          if (offsetAndTimestamp == null) {
-            tp -> KafkaOffsetRangeLimit.LATEST
-          } else {
-            tp -> offsetAndTimestamp.offset()
-          }
+            if (offsetSpec == null) {
+              tp -> KafkaOffsetRangeLimit.LATEST
+            } else {
+              tp -> offsetSpec.offset()
+            }
         }.toMap
       }
     }
 
-    val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { _ => }
-
-    fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets,
-      fnAssertFetchedOffsets)
+    fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets)
   }
 
   private def fetchSpecificOffsets0(
       fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit,
-      fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long],
-      fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit): KafkaSourceOffset = {
+      fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long]
+    ): KafkaSourceOffset = {
     val fetched = partitionsAssignedToConsumer {
       partitions => {
         fnAssertParametersWithPartitions(partitions)
 
         val partitionOffsets = fnRetrievePartitionOffsets(partitions)
 
-        partitionOffsets.foreach {
-          case (tp, KafkaOffsetRangeLimit.LATEST) =>
-            consumer.seekToEnd(ju.Arrays.asList(tp))
-          case (tp, KafkaOffsetRangeLimit.EARLIEST) =>
-            consumer.seekToBeginning(ju.Arrays.asList(tp))
-          case (tp, off) => consumer.seek(tp, off)
+        val listOffsetsParams = partitionOffsets.filter { case (_, off) =>
+          off == KafkaOffsetRangeLimit.LATEST || off == KafkaOffsetRangeLimit.EARLIEST
+        }.map { case (tp, off) =>
+          off match {
+            case KafkaOffsetRangeLimit.LATEST =>
+              tp -> OffsetSpec.latest()
+            case KafkaOffsetRangeLimit.EARLIEST =>
+              tp -> OffsetSpec.earliest()
+          }
         }
-
-        partitionOffsets.map {
-          case (tp, _) => tp -> consumer.position(tp)
+        val resolvedPartitionOffsets = admin.listOffsets(listOffsetsParams.asJava,

Review comment:
       Below line looks to be used multiple times (4 times roughly) - worth to extract.
   
   ```
   admin.listOffsets(listOffsetsParams, listOffsetsOptions).all().get().asScala
   .map(result => result._1 -> result._2.offset()).toMap
   ```

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -46,41 +49,35 @@ import org.apache.spark.util.{UninterruptibleThread, UninterruptibleThreadRunner
 private[kafka010] class KafkaOffsetReader(
     consumerStrategy: ConsumerStrategy,
     val driverKafkaParams: ju.Map[String, Object],
-    readerOptions: CaseInsensitiveMap[String],
-    driverGroupIdPrefix: String) extends Logging {
+    readerOptions: CaseInsensitiveMap[String]) extends Logging {
 
   /**
-   * [[UninterruptibleThreadRunner]] ensures that all [[KafkaConsumer]] communication called in an
+   * [[UninterruptibleThreadRunner]] ensures that all Kafka communication called in an
    * [[UninterruptibleThread]]. In the case of streaming queries, we are already running in an
    * [[UninterruptibleThread]], however for batch mode this is not the case.
    */
   val uninterruptibleThreadRunner = new UninterruptibleThreadRunner("Kafka Offset Reader")
 
-  /**
-   * Place [[groupId]] and [[nextId]] here so that they are initialized before any consumer is
-   * created -- see SPARK-19564.
-   */
-  private var groupId: String = null
-  private var nextId = 0
-
-  /**
-   * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the
-   * offsets and never commits them.
-   */
-  @volatile protected var _consumer: Consumer[Array[Byte], Array[Byte]] = null
+  @volatile protected var _admin: Admin = null
 
-  protected def consumer: Consumer[Array[Byte], Array[Byte]] = synchronized {
+  protected def admin: Admin = synchronized {
     assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
-    if (_consumer == null) {
-      val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
-      if (driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
-        newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
-      }
-      _consumer = consumerStrategy.createConsumer(newKafkaParams)
+    if (_admin == null) {
+      _admin = consumerStrategy.createAdmin(driverKafkaParams)
     }
-    _consumer
+    _admin
   }
 
+  def isolationLevel(): IsolationLevel = {
+    driverKafkaParams.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG) match {
+      case s: String => IsolationLevel.valueOf(s.toUpperCase(Locale.ROOT))
+      case null => IsolationLevel.valueOf(
+        ConsumerConfig.DEFAULT_ISOLATION_LEVEL.toUpperCase(Locale.ROOT))
+    }
+  }
+
+  private def listOffsetsOptions() = new ListOffsetsOptions(isolationLevel())

Review comment:
       Same here; if the instance can be reused, `val` or `lazy val`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org