You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/12/30 07:16:51 UTC
[spark] branch branch-3.0 updated: [MINOR][SS] Call
fetchEarliestOffsets when it is necessary
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 91a2260 [MINOR][SS] Call fetchEarliestOffsets when it is necessary
91a2260 is described below
commit 91a2260890683a7d0dbc745fab087bf265e68243
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Wed Dec 30 16:15:41 2020 +0900
[MINOR][SS] Call fetchEarliestOffsets when it is necessary
### What changes were proposed in this pull request?
This minor patch changes two variables where calling `fetchEarliestOffsets` to `lazy` because these values are not always necessary.
### Why are the changes needed?
To avoid unnecessary Kafka RPC calls.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
Closes #30969 from viirya/ss-minor3.
Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit 4a669f583089fc704cdc46cff8f1680470a068ee)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala | 2 +-
.../src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 6599e7e..e60054d 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -167,7 +167,7 @@ private[kafka010] class KafkaMicroBatchStream(
limit: Long,
from: PartitionOffsetMap,
until: PartitionOffsetMap): PartitionOffsetMap = {
- val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+ lazy val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
val sizes = until.flatMap {
case (tp, end) =>
// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 57879c7..a1397de 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -159,7 +159,7 @@ private[kafka010] class KafkaSource(
limit: Long,
from: Map[TopicPartition, Long],
until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
- val fromNew = kafkaReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+ lazy val fromNew = kafkaReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
val sizes = until.flatMap {
case (tp, end) =>
// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org