You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2023/01/19 10:02:16 UTC
[flink] branch master updated: [FLINK-30724][docs] Update kafka-per-partition watermark to new source
This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2a9ceb7cb03 [FLINK-30724][docs] Update kafka-per-partition watermark to new source
2a9ceb7cb03 is described below
commit 2a9ceb7cb0355f07ed6469b4071ecbcb0426daf0
Author: Yun Tang <my...@live.com>
AuthorDate: Wed Jan 18 16:48:45 2023 +0800
[FLINK-30724][docs] Update kafka-per-partition watermark to new source
---
.../datastream/event-time/generating_watermarks.md | 51 ++++++++++++++--------
.../datastream/event-time/generating_watermarks.md | 51 ++++++++++++++--------
2 files changed, 64 insertions(+), 38 deletions(-)
diff --git a/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md b/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md
index 12d84000304..f5f13d239f5 100644
--- a/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md
+++ b/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md
@@ -491,33 +491,46 @@ Python API 中尚不支持该特性。
{{< tabs "bd763159-5532-4f69-ae15-a4836886e4fe" >}}
{{< tab "Java" >}}
```java
-FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
-kafkaSource.assignTimestampsAndWatermarks(
- WatermarkStrategy
- .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
-
-DataStream<MyType> stream = env.addSource(kafkaSource);
+KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
+ .setBootstrapServers(brokers)
+ .setTopics("my-topic")
+ .setGroupId("my-group")
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setValueOnlyDeserializer(new SimpleStringSchema())
+ .build();
+
+DataStream<String> stream = env.fromSource(
+ kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource");
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
-val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
-kafkaSource.assignTimestampsAndWatermarks(
- WatermarkStrategy
- .forBoundedOutOfOrderness(Duration.ofSeconds(20)))
-
-val stream: DataStream[MyType] = env.addSource(kafkaSource)
+val kafkaSource: KafkaSource[String] = KafkaSource.builder[String]()
+ .setBootstrapServers("brokers")
+ .setTopics("my-topic")
+ .setGroupId("my-group")
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setValueOnlyDeserializer(new SimpleStringSchema)
+ .build()
+
+val stream = env.fromSource(
+ kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource")
```
{{< /tab >}}
{{< tab "Python" >}}
```python
-kafka_source = FlinkKafkaConsumer("timer-stream-source", schema, props)
-
-stream = env
- .add_source(kafka_source)
- .assign_timestamps_and_watermarks(
- WatermarkStrategy
- .for_bounded_out_of_orderness(Duration.of_seconds(20)))
+kafka_source = KafkaSource.builder()
+ .set_bootstrap_servers(brokers)
+ .set_topics("my-topic")
+ .set_group_id("my-group")
+ .set_starting_offsets(KafkaOffsetsInitializer.earliest())
+ .set_value_only_deserializer(SimpleStringSchema())
+ .build()
+
+stream = env.from_source(
+ source=kafka_source,
+ watermark_strategy=WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(20)),
+ source_name="kafka_source")
```
{{< /tab >}}
{{< /tabs >}}
diff --git a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
index a5d82970bd0..88c8488ba8c 100644
--- a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
+++ b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
@@ -564,33 +564,46 @@ case.
{{< tabs "8c79e7ba-e4c4-4892-9aab-d2e958b75c0e" >}}
{{< tab "Java" >}}
```java
-FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
-kafkaSource.assignTimestampsAndWatermarks(
- WatermarkStrategy
- .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
-
-DataStream<MyType> stream = env.addSource(kafkaSource);
+KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
+ .setBootstrapServers(brokers)
+ .setTopics("my-topic")
+ .setGroupId("my-group")
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setValueOnlyDeserializer(new SimpleStringSchema())
+ .build();
+
+DataStream<String> stream = env.fromSource(
+ kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource");
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
-val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
-kafkaSource.assignTimestampsAndWatermarks(
- WatermarkStrategy
- .forBoundedOutOfOrderness(Duration.ofSeconds(20)))
-
-val stream: DataStream[MyType] = env.addSource(kafkaSource)
+val kafkaSource: KafkaSource[String] = KafkaSource.builder[String]()
+ .setBootstrapServers("brokers")
+ .setTopics("my-topic")
+ .setGroupId("my-group")
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setValueOnlyDeserializer(new SimpleStringSchema)
+ .build()
+
+val stream = env.fromSource(
+ kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource")
```
{{< /tab >}}
{{< tab "Python" >}}
```python
-kafka_source = FlinkKafkaConsumer("timer-stream-source", schema, props)
-
-stream = env
- .add_source(kafka_source)
- .assign_timestamps_and_watermarks(
- WatermarkStrategy
- .for_bounded_out_of_orderness(Duration.of_seconds(20)))
+kafka_source = KafkaSource.builder()
+ .set_bootstrap_servers(brokers)
+ .set_topics("my-topic")
+ .set_group_id("my-group")
+ .set_starting_offsets(KafkaOffsetsInitializer.earliest())
+ .set_value_only_deserializer(SimpleStringSchema())
+ .build()
+
+stream = env.from_source(
+ source=kafka_source,
+ watermark_strategy=WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(20)),
+ source_name="kafka_source")
```
{{< /tab >}}
{{< /tabs >}}