You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/16 09:41:08 UTC
[flink] 06/09: [FLINK-26022][connector/pulsar] Implement at-least-once and exactly-once Pulsar Sink.
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 136add5d0c9c5b9b2869a9ee194f78449065b18e
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Feb 15 22:22:19 2022 +0800
[FLINK-26022][connector/pulsar] Implement at-least-once and exactly-once Pulsar Sink.
---
.../common/utils/PulsarTransactionUtils.java | 68 ++++
.../flink/connector/pulsar/sink/PulsarSink.java | 136 ++++++++
.../connector/pulsar/sink/PulsarSinkBuilder.java | 354 +++++++++++++++++++++
.../connector/pulsar/sink/PulsarSinkOptions.java | 14 +-
.../pulsar/sink/committer/PulsarCommittable.java | 71 +++++
.../committer/PulsarCommittableSerializer.java | 65 ++++
.../pulsar/sink/committer/PulsarCommitter.java | 174 ++++++++++
.../pulsar/sink/config/SinkConfiguration.java | 17 +-
.../connector/pulsar/sink/writer/PulsarWriter.java | 264 +++++++++++++++
.../sink/writer/context/PulsarSinkContext.java | 46 +++
.../sink/writer/context/PulsarSinkContextImpl.java | 61 ++++
.../sink/writer/router/KeyHashTopicRouter.java | 71 +++++
.../pulsar/sink/writer/router/MessageKeyHash.java | 85 +++++
.../sink/writer/router/RoundRobinTopicRouter.java | 63 ++++
.../pulsar/sink/writer/router/TopicRouter.java | 64 ++++
.../sink/writer/router/TopicRoutingMode.java | 87 +++++
.../sink/writer/topic/TopicMetadataListener.java | 173 ++++++++++
.../sink/writer/topic/TopicProducerRegister.java | 202 ++++++++++++
18 files changed, 2011 insertions(+), 4 deletions(-)
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
new file mode 100644
index 0000000..a48b4d4
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+
+/** A suit of workarounds for the Pulsar Transaction. */
+@Internal
+public final class PulsarTransactionUtils {
+
+ private PulsarTransactionUtils() {
+ // No public constructor
+ }
+
+ /** Create transaction with given timeout millis. */
+ public static Transaction createTransaction(PulsarClient pulsarClient, long timeoutMs) {
+ try {
+ CompletableFuture<Transaction> future =
+ sneakyClient(pulsarClient::newTransaction)
+ .withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS)
+ .build();
+
+ return future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ } catch (ExecutionException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ /**
+ * This is a bug in original {@link TransactionCoordinatorClientException#unwrap(Throwable)}
+ * method. Pulsar wraps the {@link ExecutionException} which hides the real execution exception.
+ */
+ public static TransactionCoordinatorClientException unwrap(
+ TransactionCoordinatorClientException e) {
+ return findThrowable(e.getCause(), TransactionCoordinatorClientException.class).orElse(e);
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
new file mode 100644
index 0000000..811d5b5
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittableSerializer;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter;
+import org.apache.flink.connector.pulsar.sink.writer.router.KeyHashTopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The Sink implementation of Pulsar. Please use a {@link PulsarSinkBuilder} to construct a {@link
+ * PulsarSink}. The following example shows how to create a PulsarSink receiving records of {@code
+ * String} type.
+ *
+ * <pre>{@code
+ * PulsarSink<String> sink = PulsarSink.builder()
+ * .setServiceUrl(operator().serviceUrl())
+ * .setAdminUrl(operator().adminUrl())
+ * .setTopic(topic)
+ * .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING))
+ * .build();
+ * }</pre>
+ *
+ * <p>The sink supports all delivery guarantees described by {@link DeliveryGuarantee}.
+ *
+ * <ul>
+ * <li>{@link DeliveryGuarantee#NONE} does not provide any guarantees: messages may be lost in
+ * case of issues on the Pulsar broker and messages may be duplicated in case of a Flink
+ * failure.
+ * <li>{@link DeliveryGuarantee#AT_LEAST_ONCE} the sink will wait for all outstanding records in
+ * the Pulsar buffers to be acknowledged by the Pulsar producer on a checkpoint. No messages
+ * will be lost in case of any issue with the Pulsar brokers but messages may be duplicated
+ * when Flink restarts.
+ * <li>{@link DeliveryGuarantee#EXACTLY_ONCE}: In this mode the PulsarSink will write all messages
+ * in a Pulsar transaction that will be committed to Pulsar on a checkpoint. Thus, no
+ * duplicates will be seen in case of a Flink restart. However, this delays record writing
+ * effectively until a checkpoint is written, so adjust the checkpoint duration accordingly.
+ * Additionally, it is highly recommended to tweak Pulsar transaction timeout (link) >>
+ * maximum checkpoint duration + maximum restart duration or data loss may happen when Pulsar
+ * expires an uncommitted transaction.
+ * </ul>
+ *
+ * <p>See {@link PulsarSinkBuilder} for more details.
+ *
+ * @param <IN> The input type of the sink.
+ */
+@PublicEvolving
+public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommittable> {
+ private static final long serialVersionUID = 4416714587951282119L;
+
+ private final SinkConfiguration sinkConfiguration;
+ private final PulsarSerializationSchema<IN> serializationSchema;
+ private final TopicMetadataListener metadataListener;
+ private final TopicRouter<IN> topicRouter;
+
+ PulsarSink(
+ SinkConfiguration sinkConfiguration,
+ PulsarSerializationSchema<IN> serializationSchema,
+ TopicMetadataListener metadataListener,
+ TopicRoutingMode topicRoutingMode,
+ TopicRouter<IN> topicRouter) {
+ this.sinkConfiguration = checkNotNull(sinkConfiguration);
+ this.serializationSchema = checkNotNull(serializationSchema);
+ this.metadataListener = checkNotNull(metadataListener);
+ checkNotNull(topicRoutingMode);
+
+ // Create topic router supplier.
+ if (topicRoutingMode == TopicRoutingMode.CUSTOM) {
+ this.topicRouter = checkNotNull(topicRouter);
+ } else if (topicRoutingMode == TopicRoutingMode.ROUND_ROBIN) {
+ this.topicRouter = new RoundRobinTopicRouter<>(sinkConfiguration);
+ } else {
+ this.topicRouter = new KeyHashTopicRouter<>(sinkConfiguration);
+ }
+ }
+
+ /**
+ * Create a {@link PulsarSinkBuilder} to construct a new {@link PulsarSink}.
+ *
+ * @param <IN> Type of incoming records.
+ * @return A Pulsar sink builder.
+ */
+ public static <IN> PulsarSinkBuilder<IN> builder() {
+ return new PulsarSinkBuilder<>();
+ }
+
+ @Internal
+ @Override
+ public PrecommittingSinkWriter<IN, PulsarCommittable> createWriter(InitContext initContext) {
+ return new PulsarWriter<>(
+ sinkConfiguration, serializationSchema, metadataListener, topicRouter, initContext);
+ }
+
+ @Internal
+ @Override
+ public Committer<PulsarCommittable> createCommitter() {
+ return new PulsarCommitter(sinkConfiguration);
+ }
+
+ @Internal
+ @Override
+ public SimpleVersionedSerializer<PulsarCommittable> getCommittableSerializer() {
+ return new PulsarCommittableSerializer();
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
new file mode 100644
index 0000000..a0352f5
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder;
+import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSchemaWrapper;
+import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+
+import org.apache.pulsar.client.api.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_TRANSACTION_TIMEOUT;
+import static org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.SINK_CONFIG_VALIDATOR;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.distinctTopics;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The builder class for {@link PulsarSink} to make it easier for the users to construct a {@link
+ * PulsarSink}.
+ *
+ * <p>The following example shows the minimum setup to create a PulsarSink that reads the String
+ * values from a Pulsar topic.
+ *
+ * <pre>{@code
+ * PulsarSink<String> sink = PulsarSink.builder()
+ * .setServiceUrl(operator().serviceUrl())
+ * .setAdminUrl(operator().adminUrl())
+ * .setTopics(topic)
+ * .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING))
+ * .build();
+ * }</pre>
+ *
+ * <p>The service url, admin url, and the record serializer are required fields that must be set. If
+ * you don't set the topics, make sure you have provided a custom {@link TopicRouter}. Otherwise,
+ * you must provide the topics to produce.
+ *
+ * <p>To specify the delivery guarantees of PulsarSink, one can call {@link
+ * #setDeliveryGuarantee(DeliveryGuarantee)}. The default value of the delivery guarantee is {@link
+ * DeliveryGuarantee#NONE}, and it wouldn't promise the consistence when write the message into
+ * Pulsar.
+ *
+ * <pre>{@code
+ * PulsarSink<String> sink = PulsarSink.builder()
+ * .setServiceUrl(operator().serviceUrl())
+ * .setAdminUrl(operator().adminUrl())
+ * .setTopics(topic)
+ * .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING))
+ * .setDeliveryGuarantee(deliveryGuarantee)
+ * .build();
+ * }</pre>
+ *
+ * @see PulsarSink for a more detailed explanation of the different guarantees.
+ * @param <IN> The input type of the sink.
+ */
+@PublicEvolving
+public class PulsarSinkBuilder<IN> {
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarSinkBuilder.class);
+
+ private final PulsarConfigBuilder configBuilder;
+
+ private PulsarSerializationSchema<IN> serializationSchema;
+ private TopicMetadataListener metadataListener;
+ private TopicRoutingMode topicRoutingMode;
+ private TopicRouter<IN> topicRouter;
+
+ // private builder constructor.
+ PulsarSinkBuilder() {
+ this.configBuilder = new PulsarConfigBuilder();
+ }
+
+ /**
+ * Sets the admin endpoint for the PulsarAdmin of the PulsarSink.
+ *
+ * @param adminUrl The url for the PulsarAdmin.
+ * @return this PulsarSinkBuilder.
+ */
+ public PulsarSinkBuilder<IN> setAdminUrl(String adminUrl) {
+ return setConfig(PULSAR_ADMIN_URL, adminUrl);
+ }
+
+ /**
+ * Sets the server's link for the PulsarProducer of the PulsarSink.
+ *
+ * @param serviceUrl The server url of the Pulsar cluster.
+ * @return this PulsarSinkBuilder.
+ */
+ public PulsarSinkBuilder<IN> setServiceUrl(String serviceUrl) {
+ return setConfig(PULSAR_SERVICE_URL, serviceUrl);
+ }
+
+ /**
+ * The producer name is informative, and it can be used to identify a particular producer
+ * instance from the topic stats.
+ *
+ * @param producerName The name of the producer used in Pulsar sink.
+ * @return this PulsarSinkBuilder.
+ */
+ public PulsarSinkBuilder<IN> setProducerName(String producerName) {
+ return setConfig(PULSAR_PRODUCER_NAME, producerName);
+ }
+
+ /**
+ * Set a pulsar topic list for flink sink. Some topic may not exist currently, write to this
+ * non-existed topic wouldn't throw any exception.
+ *
+ * @param topics The topic list you would like to consume message.
+ * @return this PulsarSinkBuilder.
+ */
+ public PulsarSinkBuilder<IN> setTopics(String... topics) {
+ return setTopics(Arrays.asList(topics));
+ }
+
+ /**
+ * Set a pulsar topic list for flink sink. Some topic may not exist currently, consuming this
+ * non-existed topic wouldn't throw any exception.
+ *
+ * @param topics The topic list you would like to consume message.
+ * @return this PulsarSinkBuilder.
+ */
+ public PulsarSinkBuilder<IN> setTopics(List<String> topics) {
+ checkState(metadataListener == null, "setTopics couldn't be set twice.");
+ // Making sure the topic should be distinct.
+ List<String> topicSet = distinctTopics(topics);
+ this.metadataListener = new TopicMetadataListener(topicSet);
+ return this;
+ }
+
+ /**
+ * Sets the wanted the {@link DeliveryGuarantee}. The default delivery guarantee is {@link
+ * DeliveryGuarantee#NONE}.
+ *
+ * @param deliveryGuarantee Deliver guarantees.
+ * @return this PulsarSinkBuilder.
+ */
+ public PulsarSinkBuilder<IN> setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
+ checkNotNull(deliveryGuarantee, "deliveryGuarantee");
+ configBuilder.override(PULSAR_WRITE_DELIVERY_GUARANTEE, deliveryGuarantee);
+ return this;
+ }
+
+ /**
+ * Set a routing mode for choosing right topic partition to send messages.
+ *
+ * @param topicRoutingMode Routing policy for choosing the desired topic.
+ * @return this PulsarSinkBuilder.
+ */
+ public PulsarSinkBuilder<IN> setTopicRoutingMode(TopicRoutingMode topicRoutingMode) {
+ checkArgument(
+ topicRoutingMode != TopicRoutingMode.CUSTOM,
+ "CUSTOM mode should be set by using setTopicRouter method.");
+ this.topicRoutingMode = checkNotNull(topicRoutingMode, "topicRoutingMode");
+ return this;
+ }
+
+ /**
+ * Use a custom topic router instead predefine topic routing.
+ *
+ * @param topicRouter The router for choosing topic to send message.
+ * @return this PulsarSinkBuilder.
+ */
+ public PulsarSinkBuilder<IN> setTopicRouter(TopicRouter<IN> topicRouter) {
+ if (topicRoutingMode != null && topicRoutingMode != TopicRoutingMode.CUSTOM) {
+ LOG.warn("We would override topicRoutingMode to CUSTOM if you provide TopicRouter.");
+ }
+ this.topicRoutingMode = TopicRoutingMode.CUSTOM;
+ this.topicRouter = checkNotNull(topicRouter, "topicRouter");
+ return this;
+ }
+
+ /**
+ * Sets the {@link PulsarSerializationSchema} that transforms incoming records to bytes.
+ *
+ * @param serializationSchema Pulsar specified serialize logic.
+ * @return this PulsarSinkBuilder.
+ */
+ public <T extends IN> PulsarSinkBuilder<T> setSerializationSchema(
+ PulsarSerializationSchema<T> serializationSchema) {
+ PulsarSinkBuilder<T> self = specialized();
+ self.serializationSchema = serializationSchema;
+ return self;
+ }
+
+ /**
+ * If you enable this option, we would consume and deserialize the message by using Pulsar
+ * {@link Schema}.
+ *
+ * @return this PulsarSinkBuilder.
+ */
+ public PulsarSinkBuilder<IN> enableSchemaEvolution() {
+ configBuilder.override(PULSAR_WRITE_SCHEMA_EVOLUTION, true);
+ return this;
+ }
+
+ /**
+ * Set an arbitrary property for the PulsarSink and Pulsar Producer. The valid keys can be found
+ * in {@link PulsarSinkOptions} and {@link PulsarOptions}.
+ *
+ * <p>Make sure the option could be set only once or with same value.
+ *
+ * @param key The key of the property.
+ * @param value The value of the property.
+ * @return this PulsarSinkBuilder.
+ */
+ public <T> PulsarSinkBuilder<IN> setConfig(ConfigOption<T> key, T value) {
+ configBuilder.set(key, value);
+ return this;
+ }
+
+ /**
+ * Set arbitrary properties for the PulsarSink and Pulsar Producer. The valid keys can be found
+ * in {@link PulsarSinkOptions} and {@link PulsarOptions}.
+ *
+ * @param config The config to set for the PulsarSink.
+ * @return this PulsarSinkBuilder.
+ */
+ public PulsarSinkBuilder<IN> setConfig(Configuration config) {
+ configBuilder.set(config);
+ return this;
+ }
+
+ /**
+ * Set arbitrary properties for the PulsarSink and Pulsar Producer. The valid keys can be found
+ * in {@link PulsarSinkOptions} and {@link PulsarOptions}.
+ *
+ * <p>This method is mainly used for future flink SQL binding.
+ *
+ * @param properties The config properties to set for the PulsarSink.
+ * @return this PulsarSinkBuilder.
+ */
+ public PulsarSinkBuilder<IN> setProperties(Properties properties) {
+ configBuilder.set(properties);
+ return this;
+ }
+
+ /**
+ * Build the {@link PulsarSink}.
+ *
+ * @return a PulsarSink with the settings made for this builder.
+ */
+ public PulsarSink<IN> build() {
+ // Change delivery guarantee.
+ DeliveryGuarantee deliveryGuarantee = configBuilder.get(PULSAR_WRITE_DELIVERY_GUARANTEE);
+ if (deliveryGuarantee == DeliveryGuarantee.NONE) {
+ LOG.warn(
+ "You haven't set delivery guarantee or set it to NONE, this would cause data loss. Make sure you have known this shortcoming.");
+ } else if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+ LOG.info(
+ "Exactly once require flink checkpoint and your pulsar cluster should support the transaction.");
+ configBuilder.override(PULSAR_ENABLE_TRANSACTION, true);
+ configBuilder.override(PULSAR_SEND_TIMEOUT_MS, 0L);
+
+ if (!configBuilder.contains(PULSAR_WRITE_TRANSACTION_TIMEOUT)) {
+ LOG.warn(
+ "The default pulsar transaction timeout is 3 hours, make sure it was greater than your checkpoint interval.");
+ } else {
+ Long timeout = configBuilder.get(PULSAR_WRITE_TRANSACTION_TIMEOUT);
+ LOG.warn(
+ "The configured transaction timeout is {} mille seconds, make sure it was greater than your checkpoint interval.",
+ timeout);
+ }
+ }
+
+ if (!configBuilder.contains(PULSAR_PRODUCER_NAME)) {
+ LOG.warn(
+ "We recommend set a readable producer name through setProducerName(String) in production mode.");
+ }
+
+ checkNotNull(serializationSchema, "serializationSchema must be set.");
+ if (serializationSchema instanceof PulsarSchemaWrapper
+ && !Boolean.TRUE.equals(configBuilder.get(PULSAR_WRITE_SCHEMA_EVOLUTION))) {
+ LOG.info(
+ "It seems like you want to send message in Pulsar Schema."
+ + " You can enableSchemaEvolution for using this feature."
+ + " We would use Schema.BYTES as the default schema if you don't enable this option.");
+ }
+
+ // Topic metadata listener validation.
+ if (metadataListener == null) {
+ if (topicRouter == null) {
+ throw new NullPointerException(
+ "No topic names or custom topic router are provided.");
+ } else {
+ LOG.warn(
+ "No topic set has been provided, make sure your custom topic router support empty topic set.");
+ this.metadataListener = new TopicMetadataListener();
+ }
+ }
+
+ // Topic routing mode validate.
+ if (topicRoutingMode == null) {
+ LOG.info("No topic routing mode has been chosen. We use round-robin mode as default.");
+ this.topicRoutingMode = TopicRoutingMode.ROUND_ROBIN;
+ }
+
+ // This is an unmodifiable configuration for Pulsar.
+ // We don't use Pulsar's built-in configure classes for compatible requirement.
+ SinkConfiguration sinkConfiguration =
+ configBuilder.build(SINK_CONFIG_VALIDATOR, SinkConfiguration::new);
+
+ return new PulsarSink<>(
+ sinkConfiguration,
+ serializationSchema,
+ metadataListener,
+ topicRoutingMode,
+ topicRouter);
+ }
+
+ // ------------- private helpers --------------
+
+ /** Helper method for java compiler recognize the generic type. */
+ @SuppressWarnings("unchecked")
+ private <T extends IN> PulsarSinkBuilder<T> specialized() {
+ return (PulsarSinkBuilder<T>) this;
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
index 0e16830..3a7c5bc 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import org.apache.flink.connector.pulsar.sink.writer.router.MessageKeyHash;
import org.apache.pulsar.client.api.CompressionType;
@@ -38,12 +39,13 @@ import static org.apache.flink.configuration.description.LinkElement.link;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PRODUCER_CONFIG_PREFIX;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.SINK_CONFIG_PREFIX;
+import static org.apache.flink.connector.pulsar.sink.writer.router.MessageKeyHash.MURMUR3_32_HASH;
import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES;
import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES;
import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
/**
- * Configurations for PulsarSink. All the options list here could be configured in {@code
+ * Configurations for PulsarSink. All the options list here could be configured in {@link
* PulsarSinkBuilder#setConfig(ConfigOption, Object)}. The {@link PulsarOptions} is also required
* for pulsar source.
*
@@ -99,6 +101,13 @@ public final class PulsarSinkOptions {
.withDescription(
"Auto update the topic metadata in a fixed interval (in ms). The default value is 30 minutes.");
+ public static final ConfigOption<MessageKeyHash> PULSAR_MESSAGE_KEY_HASH =
+ ConfigOptions.key(SINK_CONFIG_PREFIX + "messageKeyHash")
+ .enumType(MessageKeyHash.class)
+ .defaultValue(MURMUR3_32_HASH)
+ .withDescription(
+ "The hash policy for routing message by calculating the hash code of message key.");
+
public static final ConfigOption<Boolean> PULSAR_WRITE_SCHEMA_EVOLUTION =
ConfigOptions.key(SINK_CONFIG_PREFIX + "enableSchemaEvolution")
.booleanType()
@@ -106,7 +115,8 @@ public final class PulsarSinkOptions {
.withDescription(
Description.builder()
.text(
- "If you enable this option, we would consume and deserialize the message by using Pulsar's %s.",
+ "If you enable this option and use PulsarSerializationSchema.pulsarSchema(),"
+ + " we would consume and deserialize the message by using Pulsar's %s.",
code("Schema"))
.build());
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittable.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittable.java
new file mode 100644
index 0000000..cca8e80
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittable.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+import java.util.Objects;
+
+/** The writer state for Pulsar connector. We would used in Pulsar committer. */
+@Internal
+public class PulsarCommittable {
+
+ /** The transaction id. */
+ private final TxnID txnID;
+
+ /** The topic name with partition information. */
+ private final String topic;
+
+ public PulsarCommittable(TxnID txnID, String topic) {
+ this.txnID = txnID;
+ this.topic = topic;
+ }
+
+ public TxnID getTxnID() {
+ return txnID;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PulsarCommittable that = (PulsarCommittable) o;
+ return Objects.equals(txnID, that.txnID) && Objects.equals(topic, that.topic);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(txnID, topic);
+ }
+
+ @Override
+ public String toString() {
+ return "PulsarCommittable{" + "txnID=" + txnID + ", topic='" + topic + '\'' + '}';
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittableSerializer.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittableSerializer.java
new file mode 100644
index 0000000..324a7c6
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittableSerializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.committer;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** A serializer used to serialize {@link PulsarCommittable}. */
+public class PulsarCommittableSerializer implements SimpleVersionedSerializer<PulsarCommittable> {
+
+ private static final int CURRENT_VERSION = 1;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(PulsarCommittable obj) throws IOException {
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream out = new DataOutputStream(baos)) {
+ TxnID txnID = obj.getTxnID();
+ out.writeLong(txnID.getMostSigBits());
+ out.writeLong(txnID.getLeastSigBits());
+ out.writeUTF(obj.getTopic());
+ out.flush();
+ return baos.toByteArray();
+ }
+ }
+
+ @Override
+ public PulsarCommittable deserialize(int version, byte[] serialized) throws IOException {
+ try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ final DataInputStream in = new DataInputStream(bais)) {
+ long mostSigBits = in.readLong();
+ long leastSigBits = in.readLong();
+ TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+ String topic = in.readUTF();
+ return new PulsarCommittable(txnID, topic);
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java
new file mode 100644
index 0000000..8389bdc
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSink;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.CoordinatorNotFoundException;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
+
+/**
+ * Committer implementation for {@link PulsarSink}.
+ *
+ * <p>The committer is responsible to finalize the Pulsar transactions by committing them.
+ */
+@Internal
+public class PulsarCommitter implements Committer<PulsarCommittable>, Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarCommitter.class);
+
+ private final SinkConfiguration sinkConfiguration;
+
+ private PulsarClient pulsarClient;
+ private TransactionCoordinatorClient coordinatorClient;
+
+ public PulsarCommitter(SinkConfiguration sinkConfiguration) {
+ this.sinkConfiguration = checkNotNull(sinkConfiguration);
+ }
+
+ @Override
+ public void commit(Collection<CommitRequest<PulsarCommittable>> requests)
+ throws IOException, InterruptedException {
+ TransactionCoordinatorClient client = transactionCoordinatorClient();
+
+ for (CommitRequest<PulsarCommittable> request : requests) {
+ PulsarCommittable committable = request.getCommittable();
+ TxnID txnID = committable.getTxnID();
+ String topic = committable.getTopic();
+
+ LOG.debug("Start committing the Pulsar transaction {} for topic {}", txnID, topic);
+ try {
+ client.commit(txnID);
+ } catch (TransactionCoordinatorClientException e) {
+ // This is a known bug for Pulsar Transaction.
+ // We have to use instanceof instead of catching them.
+ TransactionCoordinatorClientException ex = PulsarTransactionUtils.unwrap(e);
+ if (ex instanceof CoordinatorNotFoundException) {
+ LOG.error(
+ "We couldn't find the Transaction Coordinator from Pulsar broker {}. "
+ + "Check your broker configuration.",
+ committable,
+ ex);
+ request.signalFailedWithKnownReason(ex);
+ } else if (ex instanceof InvalidTxnStatusException) {
+ LOG.error(
+ "Unable to commit transaction ({}) because it's in an invalid state. "
+ + "Most likely the transaction has been aborted for some reason. "
+ + "Please check the Pulsar broker logs for more details.",
+ committable,
+ ex);
+ request.signalAlreadyCommitted();
+ } else if (ex instanceof TransactionNotFoundException) {
+ if (request.getNumberOfRetries() == 0) {
+ LOG.error(
+ "Unable to commit transaction ({}) because it's not found on Pulsar broker. "
+ + "Most likely the checkpoint interval exceed the transaction timeout.",
+ committable,
+ ex);
+ request.signalFailedWithKnownReason(ex);
+ } else {
+ LOG.warn(
+ "We can't find the transaction {} after {} retry committing. "
+ + "This may mean that the transaction have been committed in previous but failed with timeout. "
+ + "So we just mark it as committed.",
+ txnID,
+ request.getNumberOfRetries());
+ request.signalAlreadyCommitted();
+ }
+ } else if (ex instanceof MetaStoreHandlerNotExistsException) {
+ LOG.error(
+ "We can't find the meta store handler by the mostSigBits from TxnID {}. "
+ + "Did you change the metadata for topic {}?",
+ committable,
+ TRANSACTION_COORDINATOR_ASSIGN,
+ ex);
+ request.signalFailedWithKnownReason(ex);
+ } else {
+ LOG.error(
+ "Encountered retriable exception while committing transaction {} for topic {}.",
+ committable,
+ topic,
+ ex);
+ int maxRecommitTimes = sinkConfiguration.getMaxRecommitTimes();
+ if (request.getNumberOfRetries() < maxRecommitTimes) {
+ request.retryLater();
+ } else {
+ String message =
+ String.format(
+ "Failed to commit transaction %s after retrying %d times",
+ txnID, maxRecommitTimes);
+ request.signalFailedWithKnownReason(new FlinkRuntimeException(message, ex));
+ }
+ }
+ } catch (Exception e) {
+ LOG.error(
+ "Transaction ({}) encountered unknown error and data could be potentially lost.",
+ committable,
+ e);
+ request.signalFailedWithUnknownReason(e);
+ }
+ }
+ }
+
+ /**
+ * Lazy initialize this backend Pulsar client. This committer may not be used in {@link
+ * DeliveryGuarantee#NONE} and {@link DeliveryGuarantee#AT_LEAST_ONCE}. So we couldn't create
+ * the Pulsar client at first.
+ */
+ private TransactionCoordinatorClient transactionCoordinatorClient() {
+ if (coordinatorClient == null) {
+ this.pulsarClient = createClient(sinkConfiguration);
+ this.coordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient();
+
+ // Ensure you have enabled transaction.
+ checkNotNull(coordinatorClient, "You haven't enable transaction in Pulsar client.");
+ }
+
+ return coordinatorClient;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (pulsarClient != null) {
+ pulsarClient.close();
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
index e0ef7ff..fe1204e 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.connector.sink.Sink.InitContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter;
+import org.apache.flink.connector.pulsar.sink.writer.router.MessageKeyHash;
+import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSchemaWrapper;
import org.apache.pulsar.client.api.Schema;
@@ -31,6 +34,7 @@ import java.util.Objects;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_RECOMMIT_TIMES;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION;
@@ -45,6 +49,7 @@ public class SinkConfiguration extends PulsarConfiguration {
private final long transactionTimeoutMillis;
private final long topicMetadataRefreshInterval;
private final int partitionSwitchSize;
+ private final MessageKeyHash messageKeyHash;
private final boolean enableSchemaEvolution;
private final int maxPendingMessages;
private final int maxRecommitTimes;
@@ -56,12 +61,13 @@ public class SinkConfiguration extends PulsarConfiguration {
this.transactionTimeoutMillis = getLong(PULSAR_WRITE_TRANSACTION_TIMEOUT);
this.topicMetadataRefreshInterval = getLong(PULSAR_TOPIC_METADATA_REFRESH_INTERVAL);
this.partitionSwitchSize = getInteger(PULSAR_BATCHING_MAX_MESSAGES);
+ this.messageKeyHash = get(PULSAR_MESSAGE_KEY_HASH);
this.enableSchemaEvolution = get(PULSAR_WRITE_SCHEMA_EVOLUTION);
this.maxPendingMessages = get(PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS);
this.maxRecommitTimes = get(PULSAR_MAX_RECOMMIT_TIMES);
}
- /** The delivery guarantee changes the behavior of {@code PulsarWriter}. */
+ /** The delivery guarantee changes the behavior of {@link PulsarWriter}. */
public DeliveryGuarantee getDeliveryGuarantee() {
return deliveryGuarantee;
}
@@ -92,9 +98,14 @@ public class SinkConfiguration extends PulsarConfiguration {
return partitionSwitchSize;
}
+ /** The message key's hash logic for routing the message into one Pulsar partition. */
+ public MessageKeyHash getMessageKeyHash() {
+ return messageKeyHash;
+ }
+
/**
* If we should serialize and send the message with a specified Pulsar {@link Schema} instead
- * the default {@link Schema#BYTES}. This switch is only used for {@code PulsarSchemaWrapper}.
+ * the default {@link Schema#BYTES}. This switch is only used for {@link PulsarSchemaWrapper}.
*/
public boolean isEnableSchemaEvolution() {
return enableSchemaEvolution;
@@ -129,6 +140,7 @@ public class SinkConfiguration extends PulsarConfiguration {
&& topicMetadataRefreshInterval == that.topicMetadataRefreshInterval
&& partitionSwitchSize == that.partitionSwitchSize
&& enableSchemaEvolution == that.enableSchemaEvolution
+ && messageKeyHash == that.messageKeyHash
&& maxPendingMessages == that.maxPendingMessages
&& maxRecommitTimes == that.maxRecommitTimes;
}
@@ -140,6 +152,7 @@ public class SinkConfiguration extends PulsarConfiguration {
transactionTimeoutMillis,
topicMetadataRefreshInterval,
partitionSwitchSize,
+ messageKeyHash,
enableSchemaEvolution,
maxPendingMessages,
maxRecommitTimes);
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
new file mode 100644
index 0000000..9b3c931
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextImpl;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param <IN> The type of the input elements.
+ */
+@Internal
+public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommittable> {
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class);
+
+ private final SinkConfiguration sinkConfiguration;
+ private final PulsarSerializationSchema<IN> serializationSchema;
+ private final TopicMetadataListener metadataListener;
+ private final TopicRouter<IN> topicRouter;
+ private final DeliveryGuarantee deliveryGuarantee;
+ private final PulsarSinkContext sinkContext;
+ private final MailboxExecutor mailboxExecutor;
+ private final TopicProducerRegister producerRegister;
+
+ private long pendingMessages = 0;
+
+ /**
+ * Constructor creating a Pulsar writer.
+ *
+ * <p>It will throw a {@link RuntimeException} if {@link
+ * PulsarSerializationSchema#open(InitializationContext, PulsarSinkContext, SinkConfiguration)}
+ * fails.
+ *
+ * @param sinkConfiguration The configuration to configure the Pulsar producer.
+ * @param serializationSchema Transform the incoming records into different message properties.
+ * @param metadataListener The listener for querying topic metadata.
+ * @param topicRouter Topic router to choose topic by incoming records.
+ * @param initContext Context to provide information about the runtime environment.
+ */
+ public PulsarWriter(
+ SinkConfiguration sinkConfiguration,
+ PulsarSerializationSchema<IN> serializationSchema,
+ TopicMetadataListener metadataListener,
+ TopicRouter<IN> topicRouter,
+ InitContext initContext) {
+ this.sinkConfiguration = checkNotNull(sinkConfiguration);
+ this.serializationSchema = checkNotNull(serializationSchema);
+ this.metadataListener = checkNotNull(metadataListener);
+ this.topicRouter = checkNotNull(topicRouter);
+ checkNotNull(initContext);
+
+ this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
+ this.sinkContext = new PulsarSinkContextImpl(initContext, sinkConfiguration);
+ this.mailboxExecutor = initContext.getMailboxExecutor();
+
+ // Initialize topic metadata listener.
+ LOG.debug("Initialize topic metadata after creating Pulsar writer.");
+ ProcessingTimeService timeService = initContext.getProcessingTimeService();
+ this.metadataListener.open(sinkConfiguration, timeService);
+
+ // Initialize topic router.
+ this.topicRouter.open(sinkConfiguration);
+
+ // Initialize the serialization schema.
+ try {
+ InitializationContext initializationContext =
+ initContext.asSerializationSchemaInitializationContext();
+ this.serializationSchema.open(initializationContext, sinkContext, sinkConfiguration);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Cannot initialize schema.", e);
+ }
+
+ // Create this producer register after opening serialization schema!
+ this.producerRegister = new TopicProducerRegister(sinkConfiguration);
+ }
+
+ @Override
+ public void write(IN element, Context context) throws IOException, InterruptedException {
+ PulsarMessage<?> message = serializationSchema.serialize(element, sinkContext);
+
+ // Choose the right topic to send.
+ String key = message.getKey();
+ List<String> availableTopics = metadataListener.availableTopics();
+ String topic = topicRouter.route(element, key, availableTopics, sinkContext);
+
+ // Create message builder for sending message.
+ TypedMessageBuilder<?> builder = createMessageBuilder(topic, context, message);
+
+ // Perform message sending.
+ if (deliveryGuarantee == DeliveryGuarantee.NONE) {
+ // We would just ignore the sending exception. This may cause data loss.
+ builder.sendAsync();
+ } else {
+ // Waiting for permits to write message.
+ requirePermits();
+ mailboxExecutor.execute(
+ () -> enqueueMessageSending(topic, builder),
+ "Failed to send message to Pulsar");
+ }
+ }
+
+ private void enqueueMessageSending(String topic, TypedMessageBuilder<?> builder)
+ throws ExecutionException, InterruptedException {
+ // Block the mailbox executor for yield method.
+ builder.sendAsync()
+ .whenComplete(
+ (id, ex) -> {
+ this.releasePermits();
+ if (ex != null) {
+ throw new FlinkRuntimeException(
+ "Failed to send data to Pulsar " + topic, ex);
+ } else {
+ LOG.debug(
+ "Sent message to Pulsar {} with message id {}", topic, id);
+ }
+ })
+ .get();
+ }
+
+ private void requirePermits() throws InterruptedException {
+ while (pendingMessages >= sinkConfiguration.getMaxPendingMessages()) {
+ LOG.info("Waiting for the available permits.");
+ mailboxExecutor.yield();
+ }
+ pendingMessages++;
+ }
+
+ private void releasePermits() {
+ this.pendingMessages -= 1;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private TypedMessageBuilder<?> createMessageBuilder(
+ String topic, Context context, PulsarMessage<?> message) {
+
+ Schema<?> schema = message.getSchema();
+ TypedMessageBuilder<?> builder = producerRegister.createMessageBuilder(topic, schema);
+
+ byte[] orderingKey = message.getOrderingKey();
+ if (orderingKey != null && orderingKey.length > 0) {
+ builder.orderingKey(orderingKey);
+ }
+
+ String key = message.getKey();
+ if (!Strings.isNullOrEmpty(key)) {
+ builder.key(key);
+ }
+
+ long eventTime = message.getEventTime();
+ if (eventTime > 0) {
+ builder.eventTime(eventTime);
+ } else {
+ // Set default message timestamp if flink has provided one.
+ Long timestamp = context.timestamp();
+ if (timestamp != null) {
+ builder.eventTime(timestamp);
+ }
+ }
+
+ // Schema evolution would serialize the message by Pulsar Schema in TypedMessageBuilder.
+ // The type has been checked in PulsarMessageBuilder#value.
+ ((TypedMessageBuilder) builder).value(message.getValue());
+
+ Map<String, String> properties = message.getProperties();
+ if (properties != null && !properties.isEmpty()) {
+ builder.properties(properties);
+ }
+
+ Long sequenceId = message.getSequenceId();
+ if (sequenceId != null) {
+ builder.sequenceId(sequenceId);
+ }
+
+ List<String> clusters = message.getReplicationClusters();
+ if (clusters != null && !clusters.isEmpty()) {
+ builder.replicationClusters(clusters);
+ }
+
+ if (message.isDisableReplication()) {
+ builder.disableReplication();
+ }
+
+ return builder;
+ }
+
+ @Override
+ public void flush(boolean endOfInput) throws IOException, InterruptedException {
+ if (endOfInput) {
+ // Try flush only once when we meet the end of the input.
+ producerRegister.flush();
+ } else {
+ while (pendingMessages != 0 && deliveryGuarantee != DeliveryGuarantee.NONE) {
+ producerRegister.flush();
+ LOG.info("Flush the pending messages to Pulsar.");
+ mailboxExecutor.yield();
+ }
+ }
+ }
+
+ @Override
+ public Collection<PulsarCommittable> prepareCommit() {
+ if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+ return producerRegister.prepareCommit();
+ } else {
+ return emptyList();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ // Close all the resources and throw the exception at last.
+ closeAll(metadataListener, producerRegister);
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java
new file mode 100644
index 0000000..5c93339
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.context;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** This context provides information on the pulsar record target location. */
+@PublicEvolving
+public interface PulsarSinkContext {
+
+ /**
+ * Get the number of the subtask that PulsarSink is running on. The numbering starts from 0 and
+ * goes up to parallelism-1. (parallelism as returned by {@link #getNumberOfParallelInstances()}
+ *
+ * @return number of subtask
+ */
+ int getParallelInstanceId();
+
+ /** @return number of parallel PulsarSink tasks. */
+ int getNumberOfParallelInstances();
+
+ /**
+ * Pulsar can check the schema and upgrade the schema automatically. If you enable this option,
+ * we wouldn't serialize the record into bytes, we send and serialize it in the client.
+ */
+ boolean isEnableSchemaEvolution();
+
+ /** Returns the current process time in flink. */
+ long processTime();
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java
new file mode 100644
index 0000000..681b25a
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.context;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+
+/** An implementation that would contain all the required context. */
+@Internal
+public class PulsarSinkContextImpl implements PulsarSinkContext {
+
+ private final int numberOfParallelSubtasks;
+ private final int parallelInstanceId;
+ private final ProcessingTimeService processingTimeService;
+ private final boolean enableSchemaEvolution;
+
+ public PulsarSinkContextImpl(InitContext initContext, SinkConfiguration sinkConfiguration) {
+ this.parallelInstanceId = initContext.getSubtaskId();
+ this.numberOfParallelSubtasks = initContext.getNumberOfParallelSubtasks();
+ this.processingTimeService = initContext.getProcessingTimeService();
+ this.enableSchemaEvolution = sinkConfiguration.isEnableSchemaEvolution();
+ }
+
+ @Override
+ public int getParallelInstanceId() {
+ return parallelInstanceId;
+ }
+
+ @Override
+ public int getNumberOfParallelInstances() {
+ return numberOfParallelSubtasks;
+ }
+
+ @Override
+ public boolean isEnableSchemaEvolution() {
+ return enableSchemaEvolution;
+ }
+
+ @Override
+ public long processTime() {
+ return processingTimeService.getCurrentProcessingTime();
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/KeyHashTopicRouter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/KeyHashTopicRouter.java
new file mode 100644
index 0000000..433d79c
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/KeyHashTopicRouter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.router;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
+
+import org.apache.pulsar.client.impl.Hash;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
+
+/**
+ * If you choose the {@link TopicRoutingMode#MESSAGE_KEY_HASH} policy, we would use this
+ * implementation. We would pick the topic by the message key's hash code. If no message key was
+ * provided, we would randomly pick one.
+ *
+ * @param <IN> The message type which should write to Pulsar.
+ */
+@Internal
+public class KeyHashTopicRouter<IN> implements TopicRouter<IN> {
+ private static final long serialVersionUID = 2475614648095079804L;
+
+ private final MessageKeyHash messageKeyHash;
+
+ public KeyHashTopicRouter(SinkConfiguration sinkConfiguration) {
+ this.messageKeyHash = sinkConfiguration.getMessageKeyHash();
+ }
+
+ @Override
+ public String route(IN in, String key, List<String> partitions, PulsarSinkContext context) {
+ checkArgument(
+ !partitions.isEmpty(),
+ "You should provide topics for routing topic by message key hash.");
+
+ int topicIndex;
+ if (Strings.isNullOrEmpty(key)) {
+ // We would randomly pick one topic to write.
+ topicIndex = ThreadLocalRandom.current().nextInt(partitions.size());
+ } else {
+ // Hash the message key and choose the topic to write.
+ Hash hash = messageKeyHash.getHash();
+ int code = hash.makeHash(key);
+ topicIndex = signSafeMod(code, partitions.size());
+ }
+
+ return partitions.get(topicIndex);
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/MessageKeyHash.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/MessageKeyHash.java
new file mode 100644
index 0000000..7f35760
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/MessageKeyHash.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.router;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+
+import org.apache.pulsar.client.impl.Hash;
+import org.apache.pulsar.client.impl.JavaStringHash;
+import org.apache.pulsar.client.impl.Murmur3_32Hash;
+
+import static org.apache.flink.configuration.description.LinkElement.link;
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/** Predefined the available hash function for routing the message. */
+@PublicEvolving
+public enum MessageKeyHash implements DescribedEnum {
+
+ /** Use regular <code>String.hashCode()</code>. */
+ JAVA_HASH(
+ "java-hash",
+ text(
+ "This hash would use %s to calculate the message key string's hash code.",
+ code("String.hashCode()"))) {
+ @Override
+ public Hash getHash() {
+ return JavaStringHash.getInstance();
+ }
+ },
+ /**
+ * Use Murmur3 hashing function. <a
+ * href="https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash</a>
+ */
+ MURMUR3_32_HASH(
+ "murmur-3-32-hash",
+ text(
+ "This hash would calculate message key's hash code by using %s algorithm.",
+ link("https://en.wikipedia.org/wiki/MurmurHash", "Murmur3"))) {
+ @Override
+ public Hash getHash() {
+ return Murmur3_32Hash.getInstance();
+ }
+ };
+
+ private final String name;
+ private final InlineElement desc;
+
+ MessageKeyHash(String name, InlineElement desc) {
+ this.name = name;
+ this.desc = desc;
+ }
+
+ @Internal
+ public abstract Hash getHash();
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ @Internal
+ @Override
+ public InlineElement getDescription() {
+ return desc;
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/RoundRobinTopicRouter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/RoundRobinTopicRouter.java
new file mode 100644
index 0000000..b9c654a
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/RoundRobinTopicRouter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.router;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * If you choose the {@link TopicRoutingMode#ROUND_ROBIN} policy, we would use this implementation.
+ * We would pick the topic one by one in a fixed batch size.
+ *
+ * @param <IN> The message type which should write to Pulsar.
+ */
+@Internal
+public class RoundRobinTopicRouter<IN> implements TopicRouter<IN> {
+ private static final long serialVersionUID = -1160533263474038206L;
+
+ /** The internal counter for counting the messages. */
+ private final AtomicLong counter = new AtomicLong(0);
+
+ /** The size when we switch to another topic. */
+ private final int partitionSwitchSize;
+
+ public RoundRobinTopicRouter(SinkConfiguration configuration) {
+ this.partitionSwitchSize = configuration.getPartitionSwitchSize();
+ }
+
+ @Override
+ public String route(IN in, String key, List<String> partitions, PulsarSinkContext context) {
+ checkArgument(
+ !partitions.isEmpty(),
+ "You should provide topics for routing topic by message key hash.");
+
+ long counts = counter.getAndAdd(1);
+ long index = (counts / partitionSwitchSize) % partitions.size();
+ // Avoid digit overflow for message counter.
+ int topicIndex = (int) (Math.abs(index) % Integer.MAX_VALUE);
+
+ return partitions.get(topicIndex);
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRouter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRouter.java
new file mode 100644
index 0000000..a2c0589
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRouter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.router;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkBuilder;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * The router for choosing the desired topic to write the Flink records. The user can implement this
+ * router for complex requirements. We have provided some easy-to-use implementations.
+ *
+ * <p>This topic router is stateless and doesn't have any initialize logic. Make sure you don't
+ * require some dynamic state.
+ *
+ * @param <IN> The record type needs to be written to Pulsar.
+ */
+@PublicEvolving
+public interface TopicRouter<IN> extends Serializable {
+
+ /**
+ * Choose the topic by given record & available partition list. You can return a new topic name
+ * if you need it.
+ *
+ * @param in The record instance which need to be written to Pulsar.
+ * @param key The key of the message from {@link PulsarMessageBuilder#key(String)}. It could be
+ * null, if message doesn't have a key.
+ * @param partitions The available partition list. This could be empty if you don't provide any
+ * topics in {@link PulsarSinkBuilder#setTopics(String...)}. You can return a custom topic,
+ * but make sure it should contain a partition index in naming. Using {@link
+ * TopicNameUtils#topicNameWithPartition(String, int)} can easily create a topic name with
+ * partition index.
+ * @param context The context contains useful information for determining the topic.
+ * @return The topic name to use.
+ */
+ String route(IN in, String key, List<String> partitions, PulsarSinkContext context);
+
+ /** Implement this method if you have some non-serializable field. */
+ default void open(SinkConfiguration sinkConfiguration) {
+ // Nothing to do by default.
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRoutingMode.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRoutingMode.java
new file mode 100644
index 0000000..c327435
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRoutingMode.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.router;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
+
+/** The routing policy for choosing the desired topic by the given message. */
+@PublicEvolving
+public enum TopicRoutingMode implements DescribedEnum {
+
+ /**
+ * The producer will publish messages across all partitions in a round-robin fashion to achieve
+ * maximum throughput. Please note that round-robin is not done per individual message but
+ * rather it's set to the same boundary of batching delay, to ensure batching is effective.
+ */
+ ROUND_ROBIN(
+ "round-robin",
+ text(
+ "The producer will publish messages across all partitions in a round-robin fashion to achieve maximum throughput."
+ + " Please note that round-robin is not done per individual message"
+ + " but rather it's set to the same boundary of %s, to ensure batching is effective.",
+ code(PULSAR_BATCHING_MAX_MESSAGES.key()))),
+
+ /**
+ * If no key is provided, The partitioned producer will randomly pick one single topic partition
+ * and publish all the messages into that partition. If a key is provided on the message, the
+ * partitioned producer will hash the key and assign the message to a particular partition.
+ */
+ MESSAGE_KEY_HASH(
+ "message-key-hash",
+ text(
+ "If no key is provided, The partitioned producer will randomly pick one single topic partition"
+ + " and publish all the messages into that partition. If a key is provided on the message,"
+ + " the partitioned producer will hash the key and assign the message to a particular partition.")),
+
+ /**
+ * Use custom topic router implementation that will be called to determine the partition for a
+ * particular message.
+ */
+ CUSTOM(
+ "custom",
+ text(
+ "Use custom %s implementation that will be called to determine the partition for a particular message.",
+ code(TopicRouter.class.getSimpleName())));
+
+ private final String name;
+ private final InlineElement desc;
+
+ TopicRoutingMode(String name, InlineElement desc) {
+ this.name = name;
+ this.desc = desc;
+ }
+
+ @Internal
+ @Override
+ public InlineElement getDescription() {
+ return desc;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java
new file mode 100644
index 0000000..acd1c61
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.topic;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Objects;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartitioned;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
+
+/**
+ * We need the latest topic metadata for making sure the newly created topic partitions would be
+ * used by the Pulsar sink. This routing policy would be different compared with Pulsar Client
+ * built-in logic. We use Flink's ProcessingTimer as the executor.
+ */
+@Internal
+public class TopicMetadataListener implements Serializable, Closeable {
+ private static final long serialVersionUID = 6186948471557507522L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(TopicMetadataListener.class);
+
+ private final ImmutableList<String> partitionedTopics;
+ private final Map<String, Integer> topicMetadata;
+ private volatile ImmutableList<String> availableTopics;
+
+ // Dynamic fields.
+ private transient PulsarAdmin pulsarAdmin;
+ private transient Long topicMetadataRefreshInterval;
+ private transient ProcessingTimeService timeService;
+
+ public TopicMetadataListener() {
+ this(emptyList());
+ }
+
+ public TopicMetadataListener(List<String> topics) {
+ List<String> partitions = new ArrayList<>(topics.size());
+ Map<String, Integer> metadata = new HashMap<>(topics.size());
+ for (String topic : topics) {
+ if (isPartitioned(topic)) {
+ partitions.add(topic);
+ } else {
+ // This would be updated when open writing.
+ metadata.put(topic, -1);
+ }
+ }
+
+ this.partitionedTopics = ImmutableList.copyOf(partitions);
+ this.topicMetadata = metadata;
+ this.availableTopics = ImmutableList.of();
+ }
+
+ /** Register the topic metadata update in process time service. */
+ public void open(SinkConfiguration sinkConfiguration, ProcessingTimeService timeService) {
+ if (topicMetadata.isEmpty()) {
+ LOG.info("No topics have been provided, skip listener initialize.");
+ return;
+ }
+
+ // Initialize listener properties.
+ this.pulsarAdmin = createAdmin(sinkConfiguration);
+ this.topicMetadataRefreshInterval = sinkConfiguration.getTopicMetadataRefreshInterval();
+ this.timeService = timeService;
+
+ // Initialize the topic metadata. Quit if fail to connect to Pulsar.
+ sneakyAdmin(this::updateTopicMetadata);
+
+ // Register time service.
+ triggerNextTopicMetadataUpdate(true);
+ }
+
+ /**
+ * Return all the available topic partitions. We would recalculate the partitions if the topic
+ * metadata has been changed. Otherwise, we would return the cached result for better
+ * performance.
+ */
+ public List<String> availableTopics() {
+ if (availableTopics.isEmpty()
+ && (!partitionedTopics.isEmpty() || !topicMetadata.isEmpty())) {
+ List<String> results = new ArrayList<>();
+ for (Map.Entry<String, Integer> entry : topicMetadata.entrySet()) {
+ for (int i = 0; i < entry.getValue(); i++) {
+ results.add(topicNameWithPartition(entry.getKey(), i));
+ }
+ }
+
+ results.addAll(partitionedTopics);
+ this.availableTopics = ImmutableList.copyOf(results);
+ }
+
+ return availableTopics;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (pulsarAdmin != null) {
+ pulsarAdmin.close();
+ }
+ }
+
+ private void triggerNextTopicMetadataUpdate(boolean initial) {
+ if (!initial) {
+ // We should update the topic metadata, ignore the pulsar admin exception.
+ try {
+ updateTopicMetadata();
+ } catch (PulsarAdminException e) {
+ LOG.warn("", e);
+ }
+ }
+
+ // Register next timer.
+ long currentProcessingTime = timeService.getCurrentProcessingTime();
+ long triggerTime = currentProcessingTime + topicMetadataRefreshInterval;
+ timeService.registerTimer(triggerTime, time -> triggerNextTopicMetadataUpdate(false));
+ }
+
+ private void updateTopicMetadata() throws PulsarAdminException {
+ boolean shouldUpdate = false;
+
+ for (Map.Entry<String, Integer> entry : topicMetadata.entrySet()) {
+ String topic = entry.getKey();
+ PartitionedTopicMetadata metadata =
+ pulsarAdmin.topics().getPartitionedTopicMetadata(topic);
+
+ // Update topic metadata if it has been changed.
+ if (!Objects.equal(entry.getValue(), metadata.partitions)) {
+ entry.setValue(metadata.partitions);
+ shouldUpdate = true;
+ }
+ }
+
+ // Clear available topics if the topic metadata has been changed.
+ if (shouldUpdate) {
+ this.availableTopics = ImmutableList.of();
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
new file mode 100644
index 0000000..9bb1753
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.topic;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction;
+import static org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.createProducerBuilder;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * All the Pulsar Producers share the same Client, but self hold the queue for a specified topic. So
+ * we have to create different instances for different topics.
+ */
+@Internal
+public class TopicProducerRegister implements Closeable {
+
+ private final PulsarClient pulsarClient;
+ private final SinkConfiguration sinkConfiguration;
+ private final Map<String, Map<SchemaInfo, Producer<?>>> producerRegister;
+ private final Map<String, Transaction> transactionRegister;
+
+ public TopicProducerRegister(SinkConfiguration sinkConfiguration) {
+ this.pulsarClient = createClient(sinkConfiguration);
+ this.sinkConfiguration = sinkConfiguration;
+ this.producerRegister = new HashMap<>();
+ this.transactionRegister = new HashMap<>();
+ }
+
+ /**
+ * Create a TypedMessageBuilder which could be sent to Pulsar directly. First, we would create a
+ * topic-related producer or use a cached instead. Then we would try to find a topic-related
+ * transaction. We would generate a transaction instance if there is no transaction. Finally, we
+ * create the message builder and put the element into it.
+ */
+ public <T> TypedMessageBuilder<T> createMessageBuilder(String topic, Schema<T> schema) {
+ Producer<T> producer = getOrCreateProducer(topic, schema);
+ DeliveryGuarantee deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
+
+ if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+ Transaction transaction = getOrCreateTransaction(topic);
+ return producer.newMessage(transaction);
+ } else {
+ return producer.newMessage();
+ }
+ }
+
+ /**
+ * Convert the transactions into a committable list for Pulsar Committer. The transactions would
+ * be removed until Flink triggered a checkpoint.
+ */
+ public List<PulsarCommittable> prepareCommit() {
+ List<PulsarCommittable> committables = new ArrayList<>(transactionRegister.size());
+ transactionRegister.forEach(
+ (topic, transaction) -> {
+ TxnID txnID = transaction.getTxnID();
+ PulsarCommittable committable = new PulsarCommittable(txnID, topic);
+ committables.add(committable);
+ });
+
+ clearTransactions();
+ return committables;
+ }
+
+ /**
+ * Flush all the messages buffered in the client and wait until all messages have been
+ * successfully persisted.
+ */
+ public void flush() throws IOException {
+ Collection<Map<SchemaInfo, Producer<?>>> collection = producerRegister.values();
+ for (Map<SchemaInfo, Producer<?>> producers : collection) {
+ for (Producer<?> producer : producers.values()) {
+ producer.flush();
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try (Closer closer = Closer.create()) {
+ // Flush all the pending messages to Pulsar. This wouldn't cause exception.
+ closer.register(this::flush);
+
+ // Abort all the existed transactions.
+ closer.register(this::abortTransactions);
+
+ // Remove all the producers.
+ closer.register(producerRegister::clear);
+
+ // All the producers would be closed by this method.
+ // We would block until all the producers have been successfully closed.
+ closer.register(pulsarClient);
+ }
+ }
+
+ /** Create or return the cached topic-related producer. */
+ @SuppressWarnings("unchecked")
+ private <T> Producer<T> getOrCreateProducer(String topic, Schema<T> schema) {
+ Map<SchemaInfo, Producer<?>> producers =
+ producerRegister.computeIfAbsent(topic, key -> new HashMap<>());
+ SchemaInfo schemaInfo = schema.getSchemaInfo();
+
+ if (producers.containsKey(schemaInfo)) {
+ return (Producer<T>) producers.get(schemaInfo);
+ } else {
+ ProducerBuilder<T> builder =
+ createProducerBuilder(pulsarClient, schema, sinkConfiguration);
+ // Set the required topic name.
+ builder.topic(topic);
+ Producer<T> producer = sneakyClient(builder::create);
+ producers.put(schemaInfo, producer);
+
+ return producer;
+ }
+ }
+
+ /**
+ * Get the cached topic-related transaction. Or create a new transaction after checkpointing.
+ */
+ private Transaction getOrCreateTransaction(String topic) {
+ return transactionRegister.computeIfAbsent(
+ topic,
+ t -> {
+ long timeoutMillis = sinkConfiguration.getTransactionTimeoutMillis();
+ return createTransaction(pulsarClient, timeoutMillis);
+ });
+ }
+
+ /** Abort the existed transactions. This method would be used when closing PulsarWriter. */
+ private void abortTransactions() {
+ if (transactionRegister.isEmpty()) {
+ return;
+ }
+
+ TransactionCoordinatorClient coordinatorClient =
+ ((PulsarClientImpl) pulsarClient).getTcClient();
+ // This null check is used for making sure transaction is enabled in client.
+ checkNotNull(coordinatorClient);
+
+ try (Closer closer = Closer.create()) {
+ for (Transaction transaction : transactionRegister.values()) {
+ TxnID txnID = transaction.getTxnID();
+ closer.register(() -> coordinatorClient.abort(txnID));
+ }
+
+ clearTransactions();
+ } catch (IOException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ /**
+ * Clean these transactions. All transactions should be passed to Pulsar committer, we would
+ * create new transaction when new message comes.
+ */
+ private void clearTransactions() {
+ transactionRegister.clear();
+ }
+}