You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/06/07 12:27:25 UTC
[spark] branch master updated: [SPARK-38181][SS][DOCS] Update comments in KafkaDataConsumer.scala
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0965fa427a7 [SPARK-38181][SS][DOCS] Update comments in KafkaDataConsumer.scala
0965fa427a7 is described below
commit 0965fa427a70d2855945e2008ccdb86a4989d763
Author: azheng <az...@adobe.com>
AuthorDate: Tue Jun 7 21:27:07 2022 +0900
[SPARK-38181][SS][DOCS] Update comments in KafkaDataConsumer.scala
### What changes were proposed in this pull request?
Fixed some minor format issue in the code comments and rephrase some of them to make it more clear
### Why are the changes needed?
Minor format correction and better readability
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Not needed, no real code changes
Closes #35484 from ArvinZheng/SPARK-38181.
Authored-by: azheng <az...@adobe.com>
Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
.../sql/kafka010/consumer/KafkaDataConsumer.scala | 25 +++++++++++-----------
1 file changed, 12 insertions(+), 13 deletions(-)
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
index 37fe38ea94e..d88e9821489 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
@@ -267,20 +267,18 @@ private[kafka010] class KafkaDataConsumer(
* within [offset, untilOffset).
*
* This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will
- * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this
- * method will try to fetch next available record within [offset, untilOffset).
- *
- * When this method tries to skip offsets due to either invisible messages or data loss and
- * reaches `untilOffset`, it will return `null`.
+ * throw an exception when it detects an unavailable offset. If `failOnDataLoss` is `false`, this
+ * method will try to fetch next available record within [offset, untilOffset). When this method
+ * reaches `untilOffset` and still can't find an available record, it will return `null`.
*
* @param offset the offset to fetch.
* @param untilOffset the max offset to fetch. Exclusive.
* @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
* @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at
- * offset if available, or throw exception.when `failOnDataLoss` is `false`,
- * this method will either return record at offset if available, or return
- * the next earliest available record less than untilOffset, or null. It
- * will not throw any exception.
+ * offset if available, or throw an exception. When `failOnDataLoss` is
+ * `false`, this method will return record at offset if available, or return
+ * the record at the next earliest available offset that is less than
+ * untilOffset, otherwise null.
*/
def get(
offset: Long,
@@ -298,9 +296,10 @@ private[kafka010] class KafkaDataConsumer(
s"requested $offset")
// The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is
- // `false`, first, we will try to fetch the record at `offset`. If no such record exists, then
- // we will move to the next available offset within `[offset, untilOffset)` and retry.
- // If `failOnDataLoss` is `true`, the loop body will be executed only once.
+ // `false`, we will try to fetch the record at `offset`, if the record does not exist, we will
+ // try to fetch next available record within [offset, untilOffset).
+ // If `failOnDataLoss` is `true`, the loop body will be executed only once, either return the
+ // record at `offset` or throw an exception when the record does not exist.
var toFetchOffset = offset
var fetchedRecord: FetchedRecord = null
// We want to break out of the while loop on a successful fetch to avoid using "return"
@@ -452,7 +451,7 @@ private[kafka010] class KafkaDataConsumer(
/**
* Get the fetched record for the given offset if available.
*
- * If the record is invisible (either a transaction message, or an aborted message when the
+ * If the record is invisible (either a transaction message, or an aborted message when the
* consumer's `isolation.level` is `read_committed`), it will return a `FetchedRecord` with the
* next offset to fetch.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org