You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/10/20 17:50:39 UTC
spark git commit: [SPARK-17999][KAFKA][SQL] Add getPreferredLocations
for KafkaSourceRDD
Repository: spark
Updated Branches:
refs/heads/master 84b245f2d -> 947f4f252
[SPARK-17999][KAFKA][SQL] Add getPreferredLocations for KafkaSourceRDD
## What changes were proposed in this pull request?
The newly implemented Structured Streaming `KafkaSource` did calculate the preferred locations for each topic partition, but didn't offer this information through RDD's `getPreferredLocations` method. So here propose to add this method in `KafkaSourceRDD`.
## How was this patch tested?
Manual verification.
Author: jerryshao <ss...@hortonworks.com>
Closes #15545 from jerryshao/SPARK-17999.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/947f4f25
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/947f4f25
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/947f4f25
Branch: refs/heads/master
Commit: 947f4f25273161dc4719419a35613a71c2e2a150
Parents: 84b245f
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Oct 20 10:50:34 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Thu Oct 20 10:50:34 2016 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/947f4f25/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
index 496af7e..802dd04 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
@@ -112,6 +112,11 @@ private[kafka010] class KafkaSourceRDD(
buf.toArray
}
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ val part = split.asInstanceOf[KafkaSourceRDDPartition]
+ part.offsetRange.preferredLoc.map(Seq(_)).getOrElse(Seq.empty)
+ }
+
override def compute(
thePart: Partition,
context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org