You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/11/30 13:20:11 UTC

[GitHub] [spark] gaborgsomogyi commented on a change in pull request #29729: [SPARK-32032][SS] Avoid infinite wait in driver because of KafkaConsumer.poll(long) API

gaborgsomogyi commented on a change in pull request #29729:
URL: https://github.com/apache/spark/pull/29729#discussion_r532589093



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
##########
@@ -50,15 +50,16 @@ private[kafka010] class KafkaOffsetReaderAdmin(
     consumerStrategy: ConsumerStrategy,
     override val driverKafkaParams: ju.Map[String, Object],
     readerOptions: CaseInsensitiveMap[String],
-    driverGroupIdPrefix: String)
-  extends KafkaOffsetReaderBase(
-    consumerStrategy,
-    driverKafkaParams,
-    readerOptions,
-    driverGroupIdPrefix) with Logging {
+    driverGroupIdPrefix: String) extends KafkaOffsetReader with Logging {
+
+  private[kafka010] val maxOffsetFetchAttempts =
+    readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, "3").toInt
+
+  private[kafka010] val offsetFetchAttemptIntervalMs =
+    readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, "1000").toLong
 
   /**
-   * [[UninterruptibleThreadRunner]] ensures that all [[KafkaConsumer]] communication called in an
+   * [[UninterruptibleThreadRunner]] ensures that all [[Admin]] communication called in an

Review comment:
       This is not related but found a bug in the original commit.

##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
##########
@@ -126,7 +127,7 @@ private[kafka010] class KafkaOffsetReaderConsumer(
   /**
    * @return The Set of TopicPartitions for a given topic
    */
-  override def fetchTopicPartitions(): Set[TopicPartition] =
+  private def fetchTopicPartitions(): Set[TopicPartition] =

Review comment:
       This is not related but since `fetchTopicPartitions` only used in the consumer based implementation I've made this private and not added as public API in the trait.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org