You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/03 17:40:50 UTC

[GitHub] [spark] viirya commented on a change in pull request #29729: [SPARK-32032][SS] Avoid infinite wait in driver because of KafkaConsumer.poll(long) API

viirya commented on a change in pull request #29729:
URL: https://github.com/apache/spark/pull/29729#discussion_r499167165



##########
File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
##########
@@ -36,31 +37,40 @@ import org.apache.spark.kafka010.KafkaConfigUpdater
  * All three strategies have overloaded constructors that allow you to specify
  * the starting offset for a particular partition.
  */
-private[kafka010] sealed trait ConsumerStrategy {
-  /** Create a [[KafkaConsumer]] and subscribe to topics according to a desired strategy */
-  def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
-
-  /**
-   * Updates the parameters with security if needed.
-   * Added a function to hide internals and reduce code duplications because all strategy uses it.
-   */
-  protected def setAuthenticationConfigIfNeeded(kafkaParams: ju.Map[String, Object]) =
-    KafkaConfigUpdater("source", kafkaParams.asScala.toMap)
+private[kafka010] sealed trait ConsumerStrategy extends Logging {
+  /** Creates an [[org.apache.kafka.clients.admin.AdminClient]] */
+  def createAdmin(kafkaParams: ju.Map[String, Object]): Admin = {
+    val updatedKafkaParams = KafkaConfigUpdater("source", kafkaParams.asScala.toMap)
       .setAuthenticationConfigIfNeeded()
       .build()
+    logDebug(s"Admin params: ${KafkaRedactionUtil.redactParams(updatedKafkaParams.asScala.toSeq)}")
+    Admin.create(updatedKafkaParams)
+  }
+
+  /** Returns the assigned or subscribed [[TopicPartition]] */
+  def assignedTopicPartitions(admin: Admin): Set[TopicPartition]
+
+  protected def retrieveAllPartitions(admin: Admin, topics: Set[String]): Set[TopicPartition] = {
+    admin.describeTopics(topics.asJava).all().get().asScala.filterNot(_._2.isInternal).flatMap {
+      case (topic, topicDescription) =>
+        topicDescription.partitions().asScala.map { topicPartitionInfo =>
+          val partition = topicPartitionInfo.partition()
+          logDebug(s"Partition added: $topic:$partition")

Review comment:
       Will this message mislead to users? For example, `AssignStrategy` is only interesting to specified partitions among all partitions here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org