You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/12/30 07:16:51 UTC

[spark] branch branch-3.0 updated: [MINOR][SS] Call fetchEarliestOffsets when it is necessary

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 91a2260  [MINOR][SS] Call fetchEarliestOffsets when it is necessary
91a2260 is described below

commit 91a2260890683a7d0dbc745fab087bf265e68243
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Wed Dec 30 16:15:41 2020 +0900

    [MINOR][SS] Call fetchEarliestOffsets when it is necessary
    
    ### What changes were proposed in this pull request?
    
    This minor patch changes two variables where calling `fetchEarliestOffsets` to `lazy` because these values are not always necessary.
    
    ### Why are the changes needed?
    
    To avoid unnecessary Kafka RPC calls.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #30969 from viirya/ss-minor3.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit 4a669f583089fc704cdc46cff8f1680470a068ee)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala     | 2 +-
 .../src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala      | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 6599e7e..e60054d 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -167,7 +167,7 @@ private[kafka010] class KafkaMicroBatchStream(
       limit: Long,
       from: PartitionOffsetMap,
       until: PartitionOffsetMap): PartitionOffsetMap = {
-    val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+    lazy val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
     val sizes = until.flatMap {
       case (tp, end) =>
         // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 57879c7..a1397de 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -159,7 +159,7 @@ private[kafka010] class KafkaSource(
       limit: Long,
       from: Map[TopicPartition, Long],
       until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
-    val fromNew = kafkaReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+    lazy val fromNew = kafkaReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
     val sizes = until.flatMap {
       case (tp, end) =>
         // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org