You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/16 02:15:05 UTC

[GitHub] [flink] imaffe commented on a change in pull request #19056: [FLINK-26028][Connector/pulsar] add sink documentation; change some source documentation

imaffe commented on a change in pull request #19056:
URL: https://github.com/apache/flink/pull/19056#discussion_r827566838



##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar `borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by `setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1", "some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's `SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported in Pulsar Sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html) interface,

Review comment:
       This is my mistake though this refers to a concept that is pretty stable. I'l; change it to use the latest stable version.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org