You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by re...@apache.org on 2022/07/11 01:55:56 UTC

[flink] branch master updated: [FLINK-28454][kafka][docs] Fix the wrong timestamp unit of KafkaSource

This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ce56e01aeaa [FLINK-28454][kafka][docs] Fix the wrong timestamp unit of KafkaSource
ce56e01aeaa is described below

commit ce56e01aeaa6ebd3b350fe089a952f238742823c
Author: fanrui <19...@gmail.com>
AuthorDate: Fri Jul 8 13:09:22 2022 +0800

    [FLINK-28454][kafka][docs] Fix the wrong timestamp unit of KafkaSource
---
 docs/content.zh/docs/connectors/datastream/kafka.md                   | 4 ++--
 docs/content/docs/connectors/datastream/kafka.md                      | 4 ++--
 .../kafka/source/enumerator/initializer/OffsetsInitializer.java       | 4 ++--
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/kafka.md b/docs/content.zh/docs/connectors/datastream/kafka.md
index c5e8d684eb2..7ec1d2e1c2c 100644
--- a/docs/content.zh/docs/connectors/datastream/kafka.md
+++ b/docs/content.zh/docs/connectors/datastream/kafka.md
@@ -114,8 +114,8 @@ KafkaSource.builder()
     .setStartingOffsets(OffsetsInitializer.committedOffsets())
     // 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
     .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
-    // 从时间戳大于等于指定时间的数据开始消费
-    .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
+    // 从时间戳大于等于指定时间戳(毫秒)的数据开始消费
+    .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
     // 从最早位点开始消费
     .setStartingOffsets(OffsetsInitializer.earliest())
     // 从最末尾位点开始消费
diff --git a/docs/content/docs/connectors/datastream/kafka.md b/docs/content/docs/connectors/datastream/kafka.md
index c32a823c581..ff3f37ade25 100644
--- a/docs/content/docs/connectors/datastream/kafka.md
+++ b/docs/content/docs/connectors/datastream/kafka.md
@@ -116,8 +116,8 @@ KafkaSource.builder()
     .setStartingOffsets(OffsetsInitializer.committedOffsets())
     // Start from committed offset, also use EARLIEST as reset strategy if committed offset doesn't exist
     .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
-    // Start from the first record whose timestamp is greater than or equals a timestamp
-    .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
+    // Start from the first record whose timestamp is greater than or equals a timestamp (milliseconds)
+    .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
     // Start from earliest offset
     .setStartingOffsets(OffsetsInitializer.earliest())
     // Start from latest offset
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
index 6de272da4b3..5a1bac5cf8d 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
@@ -125,9 +125,9 @@ public interface OffsetsInitializer extends Serializable {
     /**
      * Get an {@link OffsetsInitializer} which initializes the offsets in each partition so that the
      * initialized offset is the offset of the first record whose record timestamp is greater than
-     * or equals the give timestamp.
+     * or equals the give timestamp (milliseconds).
      *
-     * @param timestamp the timestamp to start the consumption.
+     * @param timestamp the timestamp (milliseconds) to start the consumption.
      * @return an {@link OffsetsInitializer} which initializes the offsets based on the given
      *     timestamp.
      * @see KafkaAdminClient#listOffsets(Map)