You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "rschwagercharter (via GitHub)" <gi...@apache.org> on 2024/01/30 18:52:51 UTC

[PR] [SPARK-46798] Kafka custom partition location assignment in Spark Structured Streaming (rack awareness) [spark]

rschwagercharter opened a new pull request, #44954:
URL: https://github.com/apache/spark/pull/44954

   ### What changes were proposed in this pull request?
   Add support for custom partition location assignment for Kafka sources in Structured Streaming.
   
   
   ### Why are the changes needed?
   
   [Please see the design doc for greater detail and further discussion](https://docs.google.com/document/d/1RoEk_mt8AUh9sTQZ1NfzIuuYKf1zx6BP1K3IlJ2b8iM/edit#heading=h.pbt6pdb2jt5c)
   
   [SPARK-15406](https://issues.apache.org/jira/browse/SPARK-15406) Added Kafka consumer support to Spark Structured Streaming, but it did not add custom partition location assignment as a feature. The Structured Streaming Kafka consumer as it exists today evenly allocates Kafka topic partitions to executors without regard to Kafka broker rack information or executor location. This behavior can drive large cross-AZ networking costs in large deployments.
   
   [The design doc for SPARK-15406](https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit#heading=h.k36c6oyz89xw) described the ability to assign Kafka partitions to particular executors (a feature which would enable rack awareness), but it seems that feature was never implemented.
   
   For DStreams users, there does seem to be a way to assign Kafka partitions to Spark executors in a custom fashion with [LocationStrategies.PreferFixed](https://github.com/apache/spark/blob/master/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala#L69), so this sort of functionality has a precedent.
   
   
   ### Does this PR introduce _any_ user-facing change?
   An additional parameter will be accepted on the Kafka source provider. This parameter is provisionally named `partitionlocationassigner`. The parameter takes a class name, which when instantiated gives the Kafka source with user-provided Kafka partition location suggestions. The class should implement a new trait defined in this PR and described in the [design document](https://docs.google.com/document/d/1RoEk_mt8AUh9sTQZ1NfzIuuYKf1zx6BP1K3IlJ2b8iM/edit#heading=h.pbt6pdb2jt5c).
   
   ### How was this patch tested?
   Unit tests are forthcoming. 
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] (DRAFT) [SPARK-46798] Kafka custom partition location assignment in Spark Structured Streaming (rack awareness) [spark]

Posted by "subham611 (via GitHub)" <gi...@apache.org>.
subham611 commented on code in PR #44954:
URL: https://github.com/apache/spark/pull/44954#discussion_r1479865847


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala:
##########
@@ -62,16 +65,20 @@ private[kafka010] sealed trait ConsumerStrategy extends Logging {
       .setAuthenticationConfigIfNeeded()
       .build()
 
-  protected def retrieveAllPartitions(admin: Admin, topics: Set[String]): Set[TopicPartition] = {
-    admin.describeTopics(topics.asJava).all().get().asScala.filterNot(_._2.isInternal).flatMap {
-      case (topic, topicDescription) =>
-        topicDescription.partitions().asScala.map { topicPartitionInfo =>
-          val partition = topicPartitionInfo.partition()
-          logDebug(s"Partition found: $topic:$partition")
-          new TopicPartition(topic, partition)
-        }
-    }.toSet
-  }
+  protected def retrieveAllPartitions(
+    admin: Admin,
+    topics: Set[String]): Set[PartitionDescription] =
+    admin.describeTopics(topics.asJava)
+      .all()
+      .get()
+      .asScala
+      .map(_._2)
+      .filterNot(_.isInternal)
+      .flatMap { (topicDescription: TopicDescription) =>
+        val topic = topicDescription.name()
+        topicDescription.partitions.asScala
+          .map(tpi => PartitionDescription.fromTopicPartitionInfo(topic, tpi))
+      }.toSet

Review Comment:
   ignore this. This is just reformatting due to scala linter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] (DRAFT) [SPARK-46798] Kafka custom partition location assignment in Spark Structured Streaming (rack awareness) [spark]

Posted by "subham611 (via GitHub)" <gi...@apache.org>.
subham611 commented on code in PR #44954:
URL: https://github.com/apache/spark/pull/44954#discussion_r1479865847


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala:
##########
@@ -62,16 +65,20 @@ private[kafka010] sealed trait ConsumerStrategy extends Logging {
       .setAuthenticationConfigIfNeeded()
       .build()
 
-  protected def retrieveAllPartitions(admin: Admin, topics: Set[String]): Set[TopicPartition] = {
-    admin.describeTopics(topics.asJava).all().get().asScala.filterNot(_._2.isInternal).flatMap {
-      case (topic, topicDescription) =>
-        topicDescription.partitions().asScala.map { topicPartitionInfo =>
-          val partition = topicPartitionInfo.partition()
-          logDebug(s"Partition found: $topic:$partition")
-          new TopicPartition(topic, partition)
-        }
-    }.toSet
-  }
+  protected def retrieveAllPartitions(
+    admin: Admin,
+    topics: Set[String]): Set[PartitionDescription] =
+    admin.describeTopics(topics.asJava)
+      .all()
+      .get()
+      .asScala
+      .map(_._2)
+      .filterNot(_.isInternal)
+      .flatMap { (topicDescription: TopicDescription) =>
+        val topic = topicDescription.name()
+        topicDescription.partitions.asScala
+          .map(tpi => PartitionDescription.fromTopicPartitionInfo(topic, tpi))
+      }.toSet

Review Comment:
   ignore this. This is just reformatting due to scala linter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org