You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2023/01/19 10:02:16 UTC

[flink] branch master updated: [FLINK-30724][docs] Update kafka-per-partition watermark to new source

This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a9ceb7cb03 [FLINK-30724][docs] Update kafka-per-partition watermark to new source
2a9ceb7cb03 is described below

commit 2a9ceb7cb0355f07ed6469b4071ecbcb0426daf0
Author: Yun Tang <my...@live.com>
AuthorDate: Wed Jan 18 16:48:45 2023 +0800

    [FLINK-30724][docs] Update kafka-per-partition watermark to new source
---
 .../datastream/event-time/generating_watermarks.md | 51 ++++++++++++++--------
 .../datastream/event-time/generating_watermarks.md | 51 ++++++++++++++--------
 2 files changed, 64 insertions(+), 38 deletions(-)

diff --git a/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md b/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md
index 12d84000304..f5f13d239f5 100644
--- a/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md
+++ b/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md
@@ -491,33 +491,46 @@ Python API 中尚不支持该特性。
 {{< tabs "bd763159-5532-4f69-ae15-a4836886e4fe" >}}
 {{< tab "Java" >}}
 ```java
-FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
-kafkaSource.assignTimestampsAndWatermarks(
-        WatermarkStrategy
-                .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
-
-DataStream<MyType> stream = env.addSource(kafkaSource);
+KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
+    .setBootstrapServers(brokers)
+    .setTopics("my-topic")
+    .setGroupId("my-group")
+    .setStartingOffsets(OffsetsInitializer.earliest())
+    .setValueOnlyDeserializer(new SimpleStringSchema())
+    .build();
+
+DataStream<String> stream = env.fromSource(
+    kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource");
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
-kafkaSource.assignTimestampsAndWatermarks(
-  WatermarkStrategy
-    .forBoundedOutOfOrderness(Duration.ofSeconds(20)))
-
-val stream: DataStream[MyType] = env.addSource(kafkaSource)
+val kafkaSource: KafkaSource[String] = KafkaSource.builder[String]()
+    .setBootstrapServers("brokers")
+    .setTopics("my-topic")
+    .setGroupId("my-group")
+    .setStartingOffsets(OffsetsInitializer.earliest())
+    .setValueOnlyDeserializer(new SimpleStringSchema)
+    .build()
+
+val stream = env.fromSource(
+    kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource")
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
 ```python
-kafka_source = FlinkKafkaConsumer("timer-stream-source", schema, props)
-
-stream = env
-    .add_source(kafka_source)
-    .assign_timestamps_and_watermarks(
-        WatermarkStrategy
-            .for_bounded_out_of_orderness(Duration.of_seconds(20)))
+kafka_source = KafkaSource.builder()
+    .set_bootstrap_servers(brokers)
+    .set_topics("my-topic")
+    .set_group_id("my-group")
+    .set_starting_offsets(KafkaOffsetsInitializer.earliest())
+    .set_value_only_deserializer(SimpleStringSchema())
+    .build()
+
+stream = env.from_source(
+    source=kafka_source,
+    watermark_strategy=WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(20)),
+    source_name="kafka_source")
 ```
 {{< /tab >}}
 {{< /tabs >}}
diff --git a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
index a5d82970bd0..88c8488ba8c 100644
--- a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
+++ b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
@@ -564,33 +564,46 @@ case.
 {{< tabs "8c79e7ba-e4c4-4892-9aab-d2e958b75c0e" >}}
 {{< tab "Java" >}}
 ```java
-FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
-kafkaSource.assignTimestampsAndWatermarks(
-        WatermarkStrategy
-                .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
-
-DataStream<MyType> stream = env.addSource(kafkaSource);
+KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
+    .setBootstrapServers(brokers)
+    .setTopics("my-topic")
+    .setGroupId("my-group")
+    .setStartingOffsets(OffsetsInitializer.earliest())
+    .setValueOnlyDeserializer(new SimpleStringSchema())
+    .build();
+
+DataStream<String> stream = env.fromSource(
+    kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource");
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
-kafkaSource.assignTimestampsAndWatermarks(
-  WatermarkStrategy
-    .forBoundedOutOfOrderness(Duration.ofSeconds(20)))
-
-val stream: DataStream[MyType] = env.addSource(kafkaSource)
+val kafkaSource: KafkaSource[String] = KafkaSource.builder[String]()
+    .setBootstrapServers("brokers")
+    .setTopics("my-topic")
+    .setGroupId("my-group")
+    .setStartingOffsets(OffsetsInitializer.earliest())
+    .setValueOnlyDeserializer(new SimpleStringSchema)
+    .build()
+
+val stream = env.fromSource(
+    kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource")
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
 ```python
-kafka_source = FlinkKafkaConsumer("timer-stream-source", schema, props)
-
-stream = env
-    .add_source(kafka_source)
-    .assign_timestamps_and_watermarks(
-        WatermarkStrategy
-            .for_bounded_out_of_orderness(Duration.of_seconds(20)))
+kafka_source = KafkaSource.builder()
+    .set_bootstrap_servers(brokers)
+    .set_topics("my-topic")
+    .set_group_id("my-group")
+    .set_starting_offsets(KafkaOffsetsInitializer.earliest())
+    .set_value_only_deserializer(SimpleStringSchema())
+    .build()
+
+stream = env.from_source(
+    source=kafka_source,
+    watermark_strategy=WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(20)),
+    source_name="kafka_source")
 ```
 {{< /tab >}}
 {{< /tabs >}}