You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by re...@apache.org on 2022/07/11 01:55:56 UTC
[flink] branch master updated: [FLINK-28454][kafka][docs] Fix the wrong timestamp unit of KafkaSource
This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ce56e01aeaa [FLINK-28454][kafka][docs] Fix the wrong timestamp unit of KafkaSource
ce56e01aeaa is described below
commit ce56e01aeaa6ebd3b350fe089a952f238742823c
Author: fanrui <19...@gmail.com>
AuthorDate: Fri Jul 8 13:09:22 2022 +0800
[FLINK-28454][kafka][docs] Fix the wrong timestamp unit of KafkaSource
---
docs/content.zh/docs/connectors/datastream/kafka.md | 4 ++--
docs/content/docs/connectors/datastream/kafka.md | 4 ++--
.../kafka/source/enumerator/initializer/OffsetsInitializer.java | 4 ++--
3 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/kafka.md b/docs/content.zh/docs/connectors/datastream/kafka.md
index c5e8d684eb2..7ec1d2e1c2c 100644
--- a/docs/content.zh/docs/connectors/datastream/kafka.md
+++ b/docs/content.zh/docs/connectors/datastream/kafka.md
@@ -114,8 +114,8 @@ KafkaSource.builder()
.setStartingOffsets(OffsetsInitializer.committedOffsets())
// 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
- // 从时间戳大于等于指定时间的数据开始消费
- .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
+ // 从时间戳大于等于指定时间戳(毫秒)的数据开始消费
+ .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
// 从最早位点开始消费
.setStartingOffsets(OffsetsInitializer.earliest())
// 从最末尾位点开始消费
diff --git a/docs/content/docs/connectors/datastream/kafka.md b/docs/content/docs/connectors/datastream/kafka.md
index c32a823c581..ff3f37ade25 100644
--- a/docs/content/docs/connectors/datastream/kafka.md
+++ b/docs/content/docs/connectors/datastream/kafka.md
@@ -116,8 +116,8 @@ KafkaSource.builder()
.setStartingOffsets(OffsetsInitializer.committedOffsets())
// Start from committed offset, also use EARLIEST as reset strategy if committed offset doesn't exist
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
- // Start from the first record whose timestamp is greater than or equals a timestamp
- .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
+ // Start from the first record whose timestamp is greater than or equals a timestamp (milliseconds)
+ .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
// Start from earliest offset
.setStartingOffsets(OffsetsInitializer.earliest())
// Start from latest offset
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
index 6de272da4b3..5a1bac5cf8d 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
@@ -125,9 +125,9 @@ public interface OffsetsInitializer extends Serializable {
/**
* Get an {@link OffsetsInitializer} which initializes the offsets in each partition so that the
* initialized offset is the offset of the first record whose record timestamp is greater than
- * or equals the give timestamp.
+ * or equals the give timestamp (milliseconds).
*
- * @param timestamp the timestamp to start the consumption.
+ * @param timestamp the timestamp (milliseconds) to start the consumption.
* @return an {@link OffsetsInitializer} which initializes the offsets based on the given
* timestamp.
* @see KafkaAdminClient#listOffsets(Map)