You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/06/07 12:27:25 UTC

[spark] branch master updated: [SPARK-38181][SS][DOCS] Update comments in KafkaDataConsumer.scala

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0965fa427a7 [SPARK-38181][SS][DOCS] Update comments in KafkaDataConsumer.scala
0965fa427a7 is described below

commit 0965fa427a70d2855945e2008ccdb86a4989d763
Author: azheng <az...@adobe.com>
AuthorDate: Tue Jun 7 21:27:07 2022 +0900

    [SPARK-38181][SS][DOCS] Update comments in KafkaDataConsumer.scala
    
    ### What changes were proposed in this pull request?
    Fixed some minor format issue in the code comments and rephrase some of them to make it more clear
    
    ### Why are the changes needed?
    Minor format correction and better readability
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Not needed, no real code changes
    
    Closes #35484 from ArvinZheng/SPARK-38181.
    
    Authored-by: azheng <az...@adobe.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/kafka010/consumer/KafkaDataConsumer.scala  | 25 +++++++++++-----------
 1 file changed, 12 insertions(+), 13 deletions(-)

diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
index 37fe38ea94e..d88e9821489 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
@@ -267,20 +267,18 @@ private[kafka010] class KafkaDataConsumer(
    * within [offset, untilOffset).
    *
    * This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
+   * throw an exception when it detects an unavailable offset. If `failOnDataLoss` is `false`, this
+   * method will try to fetch next available record within [offset, untilOffset). When this method
+   * reaches `untilOffset` and still can't find an available record, it will return `null`.
    *
    * @param offset         the offset to fetch.
    * @param untilOffset    the max offset to fetch. Exclusive.
    * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
    * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at
-   *                       offset if available, or throw exception.when `failOnDataLoss` is `false`,
-   *                       this method will either return record at offset if available, or return
-   *                       the next earliest available record less than untilOffset, or null. It
-   *                       will not throw any exception.
+   *                       offset if available, or throw an exception. When `failOnDataLoss` is
+   *                       `false`, this method will return record at offset if available, or return
+   *                       the record at the next earliest available offset that is less than
+   *                       untilOffset, otherwise null.
    */
   def get(
       offset: Long,
@@ -298,9 +296,10 @@ private[kafka010] class KafkaDataConsumer(
       s"requested $offset")
 
     // The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is
-    // `false`, first, we will try to fetch the record at `offset`. If no such record exists, then
-    // we will move to the next available offset within `[offset, untilOffset)` and retry.
-    // If `failOnDataLoss` is `true`, the loop body will be executed only once.
+    // `false`, we will try to fetch the record at `offset`, if the record does not exist, we will
+    // try to fetch next available record within [offset, untilOffset).
+    // If `failOnDataLoss` is `true`, the loop body will be executed only once, either return the
+    // record at `offset` or throw an exception when the record does not exist.
     var toFetchOffset = offset
     var fetchedRecord: FetchedRecord = null
     // We want to break out of the while loop on a successful fetch to avoid using "return"
@@ -452,7 +451,7 @@ private[kafka010] class KafkaDataConsumer(
   /**
    * Get the fetched record for the given offset if available.
    *
-   * If the record is invisible (either a  transaction message, or an aborted message when the
+   * If the record is invisible (either a transaction message, or an aborted message when the
    * consumer's `isolation.level` is `read_committed`), it will return a `FetchedRecord` with the
    * next offset to fetch.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org