You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/16 09:41:08 UTC

[flink] 06/09: [FLINK-26022][connector/pulsar] Implement at-least-once and exactly-once Pulsar Sink.

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 136add5d0c9c5b9b2869a9ee194f78449065b18e
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Feb 15 22:22:19 2022 +0800

    [FLINK-26022][connector/pulsar] Implement at-least-once and exactly-once Pulsar Sink.
---
 .../common/utils/PulsarTransactionUtils.java       |  68 ++++
 .../flink/connector/pulsar/sink/PulsarSink.java    | 136 ++++++++
 .../connector/pulsar/sink/PulsarSinkBuilder.java   | 354 +++++++++++++++++++++
 .../connector/pulsar/sink/PulsarSinkOptions.java   |  14 +-
 .../pulsar/sink/committer/PulsarCommittable.java   |  71 +++++
 .../committer/PulsarCommittableSerializer.java     |  65 ++++
 .../pulsar/sink/committer/PulsarCommitter.java     | 174 ++++++++++
 .../pulsar/sink/config/SinkConfiguration.java      |  17 +-
 .../connector/pulsar/sink/writer/PulsarWriter.java | 264 +++++++++++++++
 .../sink/writer/context/PulsarSinkContext.java     |  46 +++
 .../sink/writer/context/PulsarSinkContextImpl.java |  61 ++++
 .../sink/writer/router/KeyHashTopicRouter.java     |  71 +++++
 .../pulsar/sink/writer/router/MessageKeyHash.java  |  85 +++++
 .../sink/writer/router/RoundRobinTopicRouter.java  |  63 ++++
 .../pulsar/sink/writer/router/TopicRouter.java     |  64 ++++
 .../sink/writer/router/TopicRoutingMode.java       |  87 +++++
 .../sink/writer/topic/TopicMetadataListener.java   | 173 ++++++++++
 .../sink/writer/topic/TopicProducerRegister.java   | 202 ++++++++++++
 18 files changed, 2011 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
new file mode 100644
index 0000000..a48b4d4
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+
+/** A suit of workarounds for the Pulsar Transaction. */
+@Internal
+public final class PulsarTransactionUtils {
+
+    private PulsarTransactionUtils() {
+        // No public constructor
+    }
+
+    /** Create transaction with given timeout millis. */
+    public static Transaction createTransaction(PulsarClient pulsarClient, long timeoutMs) {
+        try {
+            CompletableFuture<Transaction> future =
+                    sneakyClient(pulsarClient::newTransaction)
+                            .withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS)
+                            .build();
+
+            return future.get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IllegalStateException(e);
+        } catch (ExecutionException e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    /**
+     * This is a bug in original {@link TransactionCoordinatorClientException#unwrap(Throwable)}
+     * method. Pulsar wraps the {@link ExecutionException} which hides the real execution exception.
+     */
+    public static TransactionCoordinatorClientException unwrap(
+            TransactionCoordinatorClientException e) {
+        return findThrowable(e.getCause(), TransactionCoordinatorClientException.class).orElse(e);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
new file mode 100644
index 0000000..811d5b5
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittableSerializer;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter;
+import org.apache.flink.connector.pulsar.sink.writer.router.KeyHashTopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The Sink implementation of Pulsar. Please use a {@link PulsarSinkBuilder} to construct a {@link
+ * PulsarSink}. The following example shows how to create a PulsarSink receiving records of {@code
+ * String} type.
+ *
+ * <pre>{@code
+ * PulsarSink<String> sink = PulsarSink.builder()
+ *      .setServiceUrl(operator().serviceUrl())
+ *      .setAdminUrl(operator().adminUrl())
+ *      .setTopic(topic)
+ *      .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING))
+ *      .build();
+ * }</pre>
+ *
+ * <p>The sink supports all delivery guarantees described by {@link DeliveryGuarantee}.
+ *
+ * <ul>
+ *   <li>{@link DeliveryGuarantee#NONE} does not provide any guarantees: messages may be lost in
+ *       case of issues on the Pulsar broker and messages may be duplicated in case of a Flink
+ *       failure.
+ *   <li>{@link DeliveryGuarantee#AT_LEAST_ONCE} the sink will wait for all outstanding records in
+ *       the Pulsar buffers to be acknowledged by the Pulsar producer on a checkpoint. No messages
+ *       will be lost in case of any issue with the Pulsar brokers but messages may be duplicated
+ *       when Flink restarts.
+ *   <li>{@link DeliveryGuarantee#EXACTLY_ONCE}: In this mode the PulsarSink will write all messages
+ *       in a Pulsar transaction that will be committed to Pulsar on a checkpoint. Thus, no
+ *       duplicates will be seen in case of a Flink restart. However, this delays record writing
+ *       effectively until a checkpoint is written, so adjust the checkpoint duration accordingly.
+ *       Additionally, it is highly recommended to tweak Pulsar transaction timeout (link) >>
+ *       maximum checkpoint duration + maximum restart duration or data loss may happen when Pulsar
+ *       expires an uncommitted transaction.
+ * </ul>
+ *
+ * <p>See {@link PulsarSinkBuilder} for more details.
+ *
+ * @param <IN> The input type of the sink.
+ */
+@PublicEvolving
+public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommittable> {
+    private static final long serialVersionUID = 4416714587951282119L;
+
+    private final SinkConfiguration sinkConfiguration;
+    private final PulsarSerializationSchema<IN> serializationSchema;
+    private final TopicMetadataListener metadataListener;
+    private final TopicRouter<IN> topicRouter;
+
+    PulsarSink(
+            SinkConfiguration sinkConfiguration,
+            PulsarSerializationSchema<IN> serializationSchema,
+            TopicMetadataListener metadataListener,
+            TopicRoutingMode topicRoutingMode,
+            TopicRouter<IN> topicRouter) {
+        this.sinkConfiguration = checkNotNull(sinkConfiguration);
+        this.serializationSchema = checkNotNull(serializationSchema);
+        this.metadataListener = checkNotNull(metadataListener);
+        checkNotNull(topicRoutingMode);
+
+        // Create topic router supplier.
+        if (topicRoutingMode == TopicRoutingMode.CUSTOM) {
+            this.topicRouter = checkNotNull(topicRouter);
+        } else if (topicRoutingMode == TopicRoutingMode.ROUND_ROBIN) {
+            this.topicRouter = new RoundRobinTopicRouter<>(sinkConfiguration);
+        } else {
+            this.topicRouter = new KeyHashTopicRouter<>(sinkConfiguration);
+        }
+    }
+
+    /**
+     * Create a {@link PulsarSinkBuilder} to construct a new {@link PulsarSink}.
+     *
+     * @param <IN> Type of incoming records.
+     * @return A Pulsar sink builder.
+     */
+    public static <IN> PulsarSinkBuilder<IN> builder() {
+        return new PulsarSinkBuilder<>();
+    }
+
+    @Internal
+    @Override
+    public PrecommittingSinkWriter<IN, PulsarCommittable> createWriter(InitContext initContext) {
+        return new PulsarWriter<>(
+                sinkConfiguration, serializationSchema, metadataListener, topicRouter, initContext);
+    }
+
+    @Internal
+    @Override
+    public Committer<PulsarCommittable> createCommitter() {
+        return new PulsarCommitter(sinkConfiguration);
+    }
+
+    @Internal
+    @Override
+    public SimpleVersionedSerializer<PulsarCommittable> getCommittableSerializer() {
+        return new PulsarCommittableSerializer();
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
new file mode 100644
index 0000000..a0352f5
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder;
+import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSchemaWrapper;
+import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+
+import org.apache.pulsar.client.api.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_TRANSACTION_TIMEOUT;
+import static org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.SINK_CONFIG_VALIDATOR;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.distinctTopics;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The builder class for {@link PulsarSink} to make it easier for the users to construct a {@link
+ * PulsarSink}.
+ *
+ * <p>The following example shows the minimum setup to create a PulsarSink that reads the String
+ * values from a Pulsar topic.
+ *
+ * <pre>{@code
+ * PulsarSink<String> sink = PulsarSink.builder()
+ *     .setServiceUrl(operator().serviceUrl())
+ *     .setAdminUrl(operator().adminUrl())
+ *     .setTopics(topic)
+ *     .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING))
+ *     .build();
+ * }</pre>
+ *
+ * <p>The service url, admin url, and the record serializer are required fields that must be set. If
+ * you don't set the topics, make sure you have provided a custom {@link TopicRouter}. Otherwise,
+ * you must provide the topics to produce.
+ *
+ * <p>To specify the delivery guarantees of PulsarSink, one can call {@link
+ * #setDeliveryGuarantee(DeliveryGuarantee)}. The default value of the delivery guarantee is {@link
+ * DeliveryGuarantee#NONE}, and it wouldn't promise the consistence when write the message into
+ * Pulsar.
+ *
+ * <pre>{@code
+ * PulsarSink<String> sink = PulsarSink.builder()
+ *     .setServiceUrl(operator().serviceUrl())
+ *     .setAdminUrl(operator().adminUrl())
+ *     .setTopics(topic)
+ *     .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING))
+ *     .setDeliveryGuarantee(deliveryGuarantee)
+ *     .build();
+ * }</pre>
+ *
+ * @see PulsarSink for a more detailed explanation of the different guarantees.
+ * @param <IN> The input type of the sink.
+ */
+@PublicEvolving
+public class PulsarSinkBuilder<IN> {
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarSinkBuilder.class);
+
+    private final PulsarConfigBuilder configBuilder;
+
+    private PulsarSerializationSchema<IN> serializationSchema;
+    private TopicMetadataListener metadataListener;
+    private TopicRoutingMode topicRoutingMode;
+    private TopicRouter<IN> topicRouter;
+
+    // private builder constructor.
+    PulsarSinkBuilder() {
+        this.configBuilder = new PulsarConfigBuilder();
+    }
+
+    /**
+     * Sets the admin endpoint for the PulsarAdmin of the PulsarSink.
+     *
+     * @param adminUrl The url for the PulsarAdmin.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setAdminUrl(String adminUrl) {
+        return setConfig(PULSAR_ADMIN_URL, adminUrl);
+    }
+
+    /**
+     * Sets the server's link for the PulsarProducer of the PulsarSink.
+     *
+     * @param serviceUrl The server url of the Pulsar cluster.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setServiceUrl(String serviceUrl) {
+        return setConfig(PULSAR_SERVICE_URL, serviceUrl);
+    }
+
+    /**
+     * The producer name is informative, and it can be used to identify a particular producer
+     * instance from the topic stats.
+     *
+     * @param producerName The name of the producer used in Pulsar sink.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setProducerName(String producerName) {
+        return setConfig(PULSAR_PRODUCER_NAME, producerName);
+    }
+
+    /**
+     * Set a pulsar topic list for flink sink. Some topic may not exist currently, write to this
+     * non-existed topic wouldn't throw any exception.
+     *
+     * @param topics The topic list you would like to consume message.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setTopics(String... topics) {
+        return setTopics(Arrays.asList(topics));
+    }
+
+    /**
+     * Set a pulsar topic list for flink sink. Some topic may not exist currently, consuming this
+     * non-existed topic wouldn't throw any exception.
+     *
+     * @param topics The topic list you would like to consume message.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setTopics(List<String> topics) {
+        checkState(metadataListener == null, "setTopics couldn't be set twice.");
+        // Making sure the topic should be distinct.
+        List<String> topicSet = distinctTopics(topics);
+        this.metadataListener = new TopicMetadataListener(topicSet);
+        return this;
+    }
+
+    /**
+     * Sets the wanted the {@link DeliveryGuarantee}. The default delivery guarantee is {@link
+     * DeliveryGuarantee#NONE}.
+     *
+     * @param deliveryGuarantee Deliver guarantees.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
+        checkNotNull(deliveryGuarantee, "deliveryGuarantee");
+        configBuilder.override(PULSAR_WRITE_DELIVERY_GUARANTEE, deliveryGuarantee);
+        return this;
+    }
+
+    /**
+     * Set a routing mode for choosing right topic partition to send messages.
+     *
+     * @param topicRoutingMode Routing policy for choosing the desired topic.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setTopicRoutingMode(TopicRoutingMode topicRoutingMode) {
+        checkArgument(
+                topicRoutingMode != TopicRoutingMode.CUSTOM,
+                "CUSTOM mode should be set by using setTopicRouter method.");
+        this.topicRoutingMode = checkNotNull(topicRoutingMode, "topicRoutingMode");
+        return this;
+    }
+
+    /**
+     * Use a custom topic router instead predefine topic routing.
+     *
+     * @param topicRouter The router for choosing topic to send message.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setTopicRouter(TopicRouter<IN> topicRouter) {
+        if (topicRoutingMode != null && topicRoutingMode != TopicRoutingMode.CUSTOM) {
+            LOG.warn("We would override topicRoutingMode to CUSTOM if you provide TopicRouter.");
+        }
+        this.topicRoutingMode = TopicRoutingMode.CUSTOM;
+        this.topicRouter = checkNotNull(topicRouter, "topicRouter");
+        return this;
+    }
+
+    /**
+     * Sets the {@link PulsarSerializationSchema} that transforms incoming records to bytes.
+     *
+     * @param serializationSchema Pulsar specified serialize logic.
+     * @return this PulsarSinkBuilder.
+     */
+    public <T extends IN> PulsarSinkBuilder<T> setSerializationSchema(
+            PulsarSerializationSchema<T> serializationSchema) {
+        PulsarSinkBuilder<T> self = specialized();
+        self.serializationSchema = serializationSchema;
+        return self;
+    }
+
+    /**
+     * If you enable this option, we would consume and deserialize the message by using Pulsar
+     * {@link Schema}.
+     *
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> enableSchemaEvolution() {
+        configBuilder.override(PULSAR_WRITE_SCHEMA_EVOLUTION, true);
+        return this;
+    }
+
+    /**
+     * Set an arbitrary property for the PulsarSink and Pulsar Producer. The valid keys can be found
+     * in {@link PulsarSinkOptions} and {@link PulsarOptions}.
+     *
+     * <p>Make sure the option could be set only once or with same value.
+     *
+     * @param key The key of the property.
+     * @param value The value of the property.
+     * @return this PulsarSinkBuilder.
+     */
+    public <T> PulsarSinkBuilder<IN> setConfig(ConfigOption<T> key, T value) {
+        configBuilder.set(key, value);
+        return this;
+    }
+
+    /**
+     * Set arbitrary properties for the PulsarSink and Pulsar Producer. The valid keys can be found
+     * in {@link PulsarSinkOptions} and {@link PulsarOptions}.
+     *
+     * @param config The config to set for the PulsarSink.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setConfig(Configuration config) {
+        configBuilder.set(config);
+        return this;
+    }
+
+    /**
+     * Set arbitrary properties for the PulsarSink and Pulsar Producer. The valid keys can be found
+     * in {@link PulsarSinkOptions} and {@link PulsarOptions}.
+     *
+     * <p>This method is mainly used for future flink SQL binding.
+     *
+     * @param properties The config properties to set for the PulsarSink.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setProperties(Properties properties) {
+        configBuilder.set(properties);
+        return this;
+    }
+
+    /**
+     * Build the {@link PulsarSink}.
+     *
+     * @return a PulsarSink with the settings made for this builder.
+     */
+    public PulsarSink<IN> build() {
+        // Change delivery guarantee.
+        DeliveryGuarantee deliveryGuarantee = configBuilder.get(PULSAR_WRITE_DELIVERY_GUARANTEE);
+        if (deliveryGuarantee == DeliveryGuarantee.NONE) {
+            LOG.warn(
+                    "You haven't set delivery guarantee or set it to NONE, this would cause data loss. Make sure you have known this shortcoming.");
+        } else if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+            LOG.info(
+                    "Exactly once require flink checkpoint and your pulsar cluster should support the transaction.");
+            configBuilder.override(PULSAR_ENABLE_TRANSACTION, true);
+            configBuilder.override(PULSAR_SEND_TIMEOUT_MS, 0L);
+
+            if (!configBuilder.contains(PULSAR_WRITE_TRANSACTION_TIMEOUT)) {
+                LOG.warn(
+                        "The default pulsar transaction timeout is 3 hours, make sure it was greater than your checkpoint interval.");
+            } else {
+                Long timeout = configBuilder.get(PULSAR_WRITE_TRANSACTION_TIMEOUT);
+                LOG.warn(
+                        "The configured transaction timeout is {} mille seconds, make sure it was greater than your checkpoint interval.",
+                        timeout);
+            }
+        }
+
+        if (!configBuilder.contains(PULSAR_PRODUCER_NAME)) {
+            LOG.warn(
+                    "We recommend set a readable producer name through setProducerName(String) in production mode.");
+        }
+
+        checkNotNull(serializationSchema, "serializationSchema must be set.");
+        if (serializationSchema instanceof PulsarSchemaWrapper
+                && !Boolean.TRUE.equals(configBuilder.get(PULSAR_WRITE_SCHEMA_EVOLUTION))) {
+            LOG.info(
+                    "It seems like you want to send message in Pulsar Schema."
+                            + " You can enableSchemaEvolution for using this feature."
+                            + " We would use Schema.BYTES as the default schema if you don't enable this option.");
+        }
+
+        // Topic metadata listener validation.
+        if (metadataListener == null) {
+            if (topicRouter == null) {
+                throw new NullPointerException(
+                        "No topic names or custom topic router are provided.");
+            } else {
+                LOG.warn(
+                        "No topic set has been provided, make sure your custom topic router support empty topic set.");
+                this.metadataListener = new TopicMetadataListener();
+            }
+        }
+
+        // Topic routing mode validate.
+        if (topicRoutingMode == null) {
+            LOG.info("No topic routing mode has been chosen. We use round-robin mode as default.");
+            this.topicRoutingMode = TopicRoutingMode.ROUND_ROBIN;
+        }
+
+        // This is an unmodifiable configuration for Pulsar.
+        // We don't use Pulsar's built-in configure classes for compatible requirement.
+        SinkConfiguration sinkConfiguration =
+                configBuilder.build(SINK_CONFIG_VALIDATOR, SinkConfiguration::new);
+
+        return new PulsarSink<>(
+                sinkConfiguration,
+                serializationSchema,
+                metadataListener,
+                topicRoutingMode,
+                topicRouter);
+    }
+
+    // ------------- private helpers  --------------
+
+    /** Helper method for java compiler recognize the generic type. */
+    @SuppressWarnings("unchecked")
+    private <T extends IN> PulsarSinkBuilder<T> specialized() {
+        return (PulsarSinkBuilder<T>) this;
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
index 0e16830..3a7c5bc 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import org.apache.flink.connector.pulsar.sink.writer.router.MessageKeyHash;
 
 import org.apache.pulsar.client.api.CompressionType;
 
@@ -38,12 +39,13 @@ import static org.apache.flink.configuration.description.LinkElement.link;
 import static org.apache.flink.configuration.description.TextElement.code;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PRODUCER_CONFIG_PREFIX;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.SINK_CONFIG_PREFIX;
+import static org.apache.flink.connector.pulsar.sink.writer.router.MessageKeyHash.MURMUR3_32_HASH;
 import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES;
 import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES;
 import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
 
 /**
- * Configurations for PulsarSink. All the options list here could be configured in {@code
+ * Configurations for PulsarSink. All the options list here could be configured in {@link
  * PulsarSinkBuilder#setConfig(ConfigOption, Object)}. The {@link PulsarOptions} is also required
  * for pulsar source.
  *
@@ -99,6 +101,13 @@ public final class PulsarSinkOptions {
                     .withDescription(
                             "Auto update the topic metadata in a fixed interval (in ms). The default value is 30 minutes.");
 
+    public static final ConfigOption<MessageKeyHash> PULSAR_MESSAGE_KEY_HASH =
+            ConfigOptions.key(SINK_CONFIG_PREFIX + "messageKeyHash")
+                    .enumType(MessageKeyHash.class)
+                    .defaultValue(MURMUR3_32_HASH)
+                    .withDescription(
+                            "The hash policy for routing message by calculating the hash code of message key.");
+
     public static final ConfigOption<Boolean> PULSAR_WRITE_SCHEMA_EVOLUTION =
             ConfigOptions.key(SINK_CONFIG_PREFIX + "enableSchemaEvolution")
                     .booleanType()
@@ -106,7 +115,8 @@ public final class PulsarSinkOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "If you enable this option, we would consume and deserialize the message by using Pulsar's %s.",
+                                            "If you enable this option and use PulsarSerializationSchema.pulsarSchema(),"
+                                                    + " we would consume and deserialize the message by using Pulsar's %s.",
                                             code("Schema"))
                                     .build());
 
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittable.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittable.java
new file mode 100644
index 0000000..cca8e80
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittable.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+import java.util.Objects;
+
+/** The writer state for Pulsar connector. We would used in Pulsar committer. */
+@Internal
+public class PulsarCommittable {
+
+    /** The transaction id. */
+    private final TxnID txnID;
+
+    /** The topic name with partition information. */
+    private final String topic;
+
+    public PulsarCommittable(TxnID txnID, String topic) {
+        this.txnID = txnID;
+        this.topic = topic;
+    }
+
+    public TxnID getTxnID() {
+        return txnID;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PulsarCommittable that = (PulsarCommittable) o;
+        return Objects.equals(txnID, that.txnID) && Objects.equals(topic, that.topic);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(txnID, topic);
+    }
+
+    @Override
+    public String toString() {
+        return "PulsarCommittable{" + "txnID=" + txnID + ", topic='" + topic + '\'' + '}';
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittableSerializer.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittableSerializer.java
new file mode 100644
index 0000000..324a7c6
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittableSerializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.committer;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** A serializer used to serialize {@link PulsarCommittable}. */
+public class PulsarCommittableSerializer implements SimpleVersionedSerializer<PulsarCommittable> {
+
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(PulsarCommittable obj) throws IOException {
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                final DataOutputStream out = new DataOutputStream(baos)) {
+            TxnID txnID = obj.getTxnID();
+            out.writeLong(txnID.getMostSigBits());
+            out.writeLong(txnID.getLeastSigBits());
+            out.writeUTF(obj.getTopic());
+            out.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public PulsarCommittable deserialize(int version, byte[] serialized) throws IOException {
+        try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+                final DataInputStream in = new DataInputStream(bais)) {
+            long mostSigBits = in.readLong();
+            long leastSigBits = in.readLong();
+            TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+            String topic = in.readUTF();
+            return new PulsarCommittable(txnID, topic);
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java
new file mode 100644
index 0000000..8389bdc
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSink;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.CoordinatorNotFoundException;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
+
+/**
+ * Committer implementation for {@link PulsarSink}.
+ *
+ * <p>The committer is responsible to finalize the Pulsar transactions by committing them.
+ */
+@Internal
+public class PulsarCommitter implements Committer<PulsarCommittable>, Closeable {
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarCommitter.class);
+
+    private final SinkConfiguration sinkConfiguration;
+
+    private PulsarClient pulsarClient;
+    private TransactionCoordinatorClient coordinatorClient;
+
+    public PulsarCommitter(SinkConfiguration sinkConfiguration) {
+        this.sinkConfiguration = checkNotNull(sinkConfiguration);
+    }
+
+    @Override
+    public void commit(Collection<CommitRequest<PulsarCommittable>> requests)
+            throws IOException, InterruptedException {
+        TransactionCoordinatorClient client = transactionCoordinatorClient();
+
+        for (CommitRequest<PulsarCommittable> request : requests) {
+            PulsarCommittable committable = request.getCommittable();
+            TxnID txnID = committable.getTxnID();
+            String topic = committable.getTopic();
+
+            LOG.debug("Start committing the Pulsar transaction {} for topic {}", txnID, topic);
+            try {
+                client.commit(txnID);
+            } catch (TransactionCoordinatorClientException e) {
+                // This is a known bug for Pulsar Transaction.
+                // We have to use instanceof instead of catching them.
+                TransactionCoordinatorClientException ex = PulsarTransactionUtils.unwrap(e);
+                if (ex instanceof CoordinatorNotFoundException) {
+                    LOG.error(
+                            "We couldn't find the Transaction Coordinator from Pulsar broker {}. "
+                                    + "Check your broker configuration.",
+                            committable,
+                            ex);
+                    request.signalFailedWithKnownReason(ex);
+                } else if (ex instanceof InvalidTxnStatusException) {
+                    LOG.error(
+                            "Unable to commit transaction ({}) because it's in an invalid state. "
+                                    + "Most likely the transaction has been aborted for some reason. "
+                                    + "Please check the Pulsar broker logs for more details.",
+                            committable,
+                            ex);
+                    request.signalAlreadyCommitted();
+                } else if (ex instanceof TransactionNotFoundException) {
+                    if (request.getNumberOfRetries() == 0) {
+                        LOG.error(
+                                "Unable to commit transaction ({}) because it's not found on Pulsar broker. "
+                                        + "Most likely the checkpoint interval exceed the transaction timeout.",
+                                committable,
+                                ex);
+                        request.signalFailedWithKnownReason(ex);
+                    } else {
+                        LOG.warn(
+                                "We can't find the transaction {} after {} retry committing. "
+                                        + "This may mean that the transaction have been committed in previous but failed with timeout. "
+                                        + "So we just mark it as committed.",
+                                txnID,
+                                request.getNumberOfRetries());
+                        request.signalAlreadyCommitted();
+                    }
+                } else if (ex instanceof MetaStoreHandlerNotExistsException) {
+                    LOG.error(
+                            "We can't find the meta store handler by the mostSigBits from TxnID {}. "
+                                    + "Did you change the metadata for topic {}?",
+                            committable,
+                            TRANSACTION_COORDINATOR_ASSIGN,
+                            ex);
+                    request.signalFailedWithKnownReason(ex);
+                } else {
+                    LOG.error(
+                            "Encountered retriable exception while committing transaction {} for topic {}.",
+                            committable,
+                            topic,
+                            ex);
+                    int maxRecommitTimes = sinkConfiguration.getMaxRecommitTimes();
+                    if (request.getNumberOfRetries() < maxRecommitTimes) {
+                        request.retryLater();
+                    } else {
+                        String message =
+                                String.format(
+                                        "Failed to commit transaction %s after retrying %d times",
+                                        txnID, maxRecommitTimes);
+                        request.signalFailedWithKnownReason(new FlinkRuntimeException(message, ex));
+                    }
+                }
+            } catch (Exception e) {
+                LOG.error(
+                        "Transaction ({}) encountered unknown error and data could be potentially lost.",
+                        committable,
+                        e);
+                request.signalFailedWithUnknownReason(e);
+            }
+        }
+    }
+
+    /**
+     * Lazy initialize this backend Pulsar client. This committer may not be used in {@link
+     * DeliveryGuarantee#NONE} and {@link DeliveryGuarantee#AT_LEAST_ONCE}. So we couldn't create
+     * the Pulsar client at first.
+     */
+    private TransactionCoordinatorClient transactionCoordinatorClient() {
+        if (coordinatorClient == null) {
+            this.pulsarClient = createClient(sinkConfiguration);
+            this.coordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient();
+
+            // Ensure you have enabled transaction.
+            checkNotNull(coordinatorClient, "You haven't enable transaction in Pulsar client.");
+        }
+
+        return coordinatorClient;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (pulsarClient != null) {
+            pulsarClient.close();
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
index e0ef7ff..fe1204e 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.connector.sink.Sink.InitContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter;
+import org.apache.flink.connector.pulsar.sink.writer.router.MessageKeyHash;
+import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSchemaWrapper;
 
 import org.apache.pulsar.client.api.Schema;
 
@@ -31,6 +34,7 @@ import java.util.Objects;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_RECOMMIT_TIMES;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION;
@@ -45,6 +49,7 @@ public class SinkConfiguration extends PulsarConfiguration {
     private final long transactionTimeoutMillis;
     private final long topicMetadataRefreshInterval;
     private final int partitionSwitchSize;
+    private final MessageKeyHash messageKeyHash;
     private final boolean enableSchemaEvolution;
     private final int maxPendingMessages;
     private final int maxRecommitTimes;
@@ -56,12 +61,13 @@ public class SinkConfiguration extends PulsarConfiguration {
         this.transactionTimeoutMillis = getLong(PULSAR_WRITE_TRANSACTION_TIMEOUT);
         this.topicMetadataRefreshInterval = getLong(PULSAR_TOPIC_METADATA_REFRESH_INTERVAL);
         this.partitionSwitchSize = getInteger(PULSAR_BATCHING_MAX_MESSAGES);
+        this.messageKeyHash = get(PULSAR_MESSAGE_KEY_HASH);
         this.enableSchemaEvolution = get(PULSAR_WRITE_SCHEMA_EVOLUTION);
         this.maxPendingMessages = get(PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS);
         this.maxRecommitTimes = get(PULSAR_MAX_RECOMMIT_TIMES);
     }
 
-    /** The delivery guarantee changes the behavior of {@code PulsarWriter}. */
+    /** The delivery guarantee changes the behavior of {@link PulsarWriter}. */
     public DeliveryGuarantee getDeliveryGuarantee() {
         return deliveryGuarantee;
     }
@@ -92,9 +98,14 @@ public class SinkConfiguration extends PulsarConfiguration {
         return partitionSwitchSize;
     }
 
+    /** The message key's hash logic for routing the message into one Pulsar partition. */
+    public MessageKeyHash getMessageKeyHash() {
+        return messageKeyHash;
+    }
+
     /**
      * If we should serialize and send the message with a specified Pulsar {@link Schema} instead
-     * the default {@link Schema#BYTES}. This switch is only used for {@code PulsarSchemaWrapper}.
+     * the default {@link Schema#BYTES}. This switch is only used for {@link PulsarSchemaWrapper}.
      */
     public boolean isEnableSchemaEvolution() {
         return enableSchemaEvolution;
@@ -129,6 +140,7 @@ public class SinkConfiguration extends PulsarConfiguration {
                 && topicMetadataRefreshInterval == that.topicMetadataRefreshInterval
                 && partitionSwitchSize == that.partitionSwitchSize
                 && enableSchemaEvolution == that.enableSchemaEvolution
+                && messageKeyHash == that.messageKeyHash
                 && maxPendingMessages == that.maxPendingMessages
                 && maxRecommitTimes == that.maxRecommitTimes;
     }
@@ -140,6 +152,7 @@ public class SinkConfiguration extends PulsarConfiguration {
                 transactionTimeoutMillis,
                 topicMetadataRefreshInterval,
                 partitionSwitchSize,
+                messageKeyHash,
                 enableSchemaEvolution,
                 maxPendingMessages,
                 maxRecommitTimes);
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
new file mode 100644
index 0000000..9b3c931
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextImpl;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param <IN> The type of the input elements.
+ */
+@Internal
+public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommittable> {
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class);
+
+    private final SinkConfiguration sinkConfiguration;
+    private final PulsarSerializationSchema<IN> serializationSchema;
+    private final TopicMetadataListener metadataListener;
+    private final TopicRouter<IN> topicRouter;
+    private final DeliveryGuarantee deliveryGuarantee;
+    private final PulsarSinkContext sinkContext;
+    private final MailboxExecutor mailboxExecutor;
+    private final TopicProducerRegister producerRegister;
+
+    private long pendingMessages = 0;
+
+    /**
+     * Constructor creating a Pulsar writer.
+     *
+     * <p>It will throw a {@link RuntimeException} if {@link
+     * PulsarSerializationSchema#open(InitializationContext, PulsarSinkContext, SinkConfiguration)}
+     * fails.
+     *
+     * @param sinkConfiguration The configuration to configure the Pulsar producer.
+     * @param serializationSchema Transform the incoming records into different message properties.
+     * @param metadataListener The listener for querying topic metadata.
+     * @param topicRouter Topic router to choose topic by incoming records.
+     * @param initContext Context to provide information about the runtime environment.
+     */
+    public PulsarWriter(
+            SinkConfiguration sinkConfiguration,
+            PulsarSerializationSchema<IN> serializationSchema,
+            TopicMetadataListener metadataListener,
+            TopicRouter<IN> topicRouter,
+            InitContext initContext) {
+        this.sinkConfiguration = checkNotNull(sinkConfiguration);
+        this.serializationSchema = checkNotNull(serializationSchema);
+        this.metadataListener = checkNotNull(metadataListener);
+        this.topicRouter = checkNotNull(topicRouter);
+        checkNotNull(initContext);
+
+        this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
+        this.sinkContext = new PulsarSinkContextImpl(initContext, sinkConfiguration);
+        this.mailboxExecutor = initContext.getMailboxExecutor();
+
+        // Initialize topic metadata listener.
+        LOG.debug("Initialize topic metadata after creating Pulsar writer.");
+        ProcessingTimeService timeService = initContext.getProcessingTimeService();
+        this.metadataListener.open(sinkConfiguration, timeService);
+
+        // Initialize topic router.
+        this.topicRouter.open(sinkConfiguration);
+
+        // Initialize the serialization schema.
+        try {
+            InitializationContext initializationContext =
+                    initContext.asSerializationSchemaInitializationContext();
+            this.serializationSchema.open(initializationContext, sinkContext, sinkConfiguration);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Cannot initialize schema.", e);
+        }
+
+        // Create this producer register after opening serialization schema!
+        this.producerRegister = new TopicProducerRegister(sinkConfiguration);
+    }
+
+    @Override
+    public void write(IN element, Context context) throws IOException, InterruptedException {
+        PulsarMessage<?> message = serializationSchema.serialize(element, sinkContext);
+
+        // Choose the right topic to send.
+        String key = message.getKey();
+        List<String> availableTopics = metadataListener.availableTopics();
+        String topic = topicRouter.route(element, key, availableTopics, sinkContext);
+
+        // Create message builder for sending message.
+        TypedMessageBuilder<?> builder = createMessageBuilder(topic, context, message);
+
+        // Perform message sending.
+        if (deliveryGuarantee == DeliveryGuarantee.NONE) {
+            // We would just ignore the sending exception. This may cause data loss.
+            builder.sendAsync();
+        } else {
+            // Waiting for permits to write message.
+            requirePermits();
+            mailboxExecutor.execute(
+                    () -> enqueueMessageSending(topic, builder),
+                    "Failed to send message to Pulsar");
+        }
+    }
+
+    private void enqueueMessageSending(String topic, TypedMessageBuilder<?> builder)
+            throws ExecutionException, InterruptedException {
+        // Block the mailbox executor for yield method.
+        builder.sendAsync()
+                .whenComplete(
+                        (id, ex) -> {
+                            this.releasePermits();
+                            if (ex != null) {
+                                throw new FlinkRuntimeException(
+                                        "Failed to send data to Pulsar " + topic, ex);
+                            } else {
+                                LOG.debug(
+                                        "Sent message to Pulsar {} with message id {}", topic, id);
+                            }
+                        })
+                .get();
+    }
+
+    private void requirePermits() throws InterruptedException {
+        while (pendingMessages >= sinkConfiguration.getMaxPendingMessages()) {
+            LOG.info("Waiting for the available permits.");
+            mailboxExecutor.yield();
+        }
+        pendingMessages++;
+    }
+
+    private void releasePermits() {
+        this.pendingMessages -= 1;
+    }
+
+    @SuppressWarnings("rawtypes")
+    private TypedMessageBuilder<?> createMessageBuilder(
+            String topic, Context context, PulsarMessage<?> message) {
+
+        Schema<?> schema = message.getSchema();
+        TypedMessageBuilder<?> builder = producerRegister.createMessageBuilder(topic, schema);
+
+        byte[] orderingKey = message.getOrderingKey();
+        if (orderingKey != null && orderingKey.length > 0) {
+            builder.orderingKey(orderingKey);
+        }
+
+        String key = message.getKey();
+        if (!Strings.isNullOrEmpty(key)) {
+            builder.key(key);
+        }
+
+        long eventTime = message.getEventTime();
+        if (eventTime > 0) {
+            builder.eventTime(eventTime);
+        } else {
+            // Set default message timestamp if flink has provided one.
+            Long timestamp = context.timestamp();
+            if (timestamp != null) {
+                builder.eventTime(timestamp);
+            }
+        }
+
+        // Schema evolution would serialize the message by Pulsar Schema in TypedMessageBuilder.
+        // The type has been checked in PulsarMessageBuilder#value.
+        ((TypedMessageBuilder) builder).value(message.getValue());
+
+        Map<String, String> properties = message.getProperties();
+        if (properties != null && !properties.isEmpty()) {
+            builder.properties(properties);
+        }
+
+        Long sequenceId = message.getSequenceId();
+        if (sequenceId != null) {
+            builder.sequenceId(sequenceId);
+        }
+
+        List<String> clusters = message.getReplicationClusters();
+        if (clusters != null && !clusters.isEmpty()) {
+            builder.replicationClusters(clusters);
+        }
+
+        if (message.isDisableReplication()) {
+            builder.disableReplication();
+        }
+
+        return builder;
+    }
+
+    @Override
+    public void flush(boolean endOfInput) throws IOException, InterruptedException {
+        if (endOfInput) {
+            // Try flush only once when we meet the end of the input.
+            producerRegister.flush();
+        } else {
+            while (pendingMessages != 0 && deliveryGuarantee != DeliveryGuarantee.NONE) {
+                producerRegister.flush();
+                LOG.info("Flush the pending messages to Pulsar.");
+                mailboxExecutor.yield();
+            }
+        }
+    }
+
+    @Override
+    public Collection<PulsarCommittable> prepareCommit() {
+        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+            return producerRegister.prepareCommit();
+        } else {
+            return emptyList();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        // Close all the resources and throw the exception at last.
+        closeAll(metadataListener, producerRegister);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java
new file mode 100644
index 0000000..5c93339
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.context;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** This context provides information on the pulsar record target location. */
+@PublicEvolving
+public interface PulsarSinkContext {
+
+    /**
+     * Get the number of the subtask that PulsarSink is running on. The numbering starts from 0 and
+     * goes up to parallelism-1. (parallelism as returned by {@link #getNumberOfParallelInstances()}
+     *
+     * @return number of subtask
+     */
+    int getParallelInstanceId();
+
+    /** @return number of parallel PulsarSink tasks. */
+    int getNumberOfParallelInstances();
+
+    /**
+     * Pulsar can check the schema and upgrade the schema automatically. If you enable this option,
+     * we wouldn't serialize the record into bytes, we send and serialize it in the client.
+     */
+    boolean isEnableSchemaEvolution();
+
+    /** Returns the current process time in flink. */
+    long processTime();
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java
new file mode 100644
index 0000000..681b25a
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.context;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+
+/** An implementation that would contain all the required context. */
+@Internal
+public class PulsarSinkContextImpl implements PulsarSinkContext {
+
+    private final int numberOfParallelSubtasks;
+    private final int parallelInstanceId;
+    private final ProcessingTimeService processingTimeService;
+    private final boolean enableSchemaEvolution;
+
+    public PulsarSinkContextImpl(InitContext initContext, SinkConfiguration sinkConfiguration) {
+        this.parallelInstanceId = initContext.getSubtaskId();
+        this.numberOfParallelSubtasks = initContext.getNumberOfParallelSubtasks();
+        this.processingTimeService = initContext.getProcessingTimeService();
+        this.enableSchemaEvolution = sinkConfiguration.isEnableSchemaEvolution();
+    }
+
+    @Override
+    public int getParallelInstanceId() {
+        return parallelInstanceId;
+    }
+
+    @Override
+    public int getNumberOfParallelInstances() {
+        return numberOfParallelSubtasks;
+    }
+
+    @Override
+    public boolean isEnableSchemaEvolution() {
+        return enableSchemaEvolution;
+    }
+
+    @Override
+    public long processTime() {
+        return processingTimeService.getCurrentProcessingTime();
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/KeyHashTopicRouter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/KeyHashTopicRouter.java
new file mode 100644
index 0000000..433d79c
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/KeyHashTopicRouter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.router;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
+
+import org.apache.pulsar.client.impl.Hash;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
+
+/**
+ * If you choose the {@link TopicRoutingMode#MESSAGE_KEY_HASH} policy, we would use this
+ * implementation. We would pick the topic by the message key's hash code. If no message key was
+ * provided, we would randomly pick one.
+ *
+ * @param <IN> The message type which should write to Pulsar.
+ */
+@Internal
+public class KeyHashTopicRouter<IN> implements TopicRouter<IN> {
+    private static final long serialVersionUID = 2475614648095079804L;
+
+    private final MessageKeyHash messageKeyHash;
+
+    public KeyHashTopicRouter(SinkConfiguration sinkConfiguration) {
+        this.messageKeyHash = sinkConfiguration.getMessageKeyHash();
+    }
+
+    @Override
+    public String route(IN in, String key, List<String> partitions, PulsarSinkContext context) {
+        checkArgument(
+                !partitions.isEmpty(),
+                "You should provide topics for routing topic by message key hash.");
+
+        int topicIndex;
+        if (Strings.isNullOrEmpty(key)) {
+            // We would randomly pick one topic to write.
+            topicIndex = ThreadLocalRandom.current().nextInt(partitions.size());
+        } else {
+            // Hash the message key and choose the topic to write.
+            Hash hash = messageKeyHash.getHash();
+            int code = hash.makeHash(key);
+            topicIndex = signSafeMod(code, partitions.size());
+        }
+
+        return partitions.get(topicIndex);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/MessageKeyHash.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/MessageKeyHash.java
new file mode 100644
index 0000000..7f35760
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/MessageKeyHash.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.router;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+
+import org.apache.pulsar.client.impl.Hash;
+import org.apache.pulsar.client.impl.JavaStringHash;
+import org.apache.pulsar.client.impl.Murmur3_32Hash;
+
+import static org.apache.flink.configuration.description.LinkElement.link;
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/** Predefined the available hash function for routing the message. */
+@PublicEvolving
+public enum MessageKeyHash implements DescribedEnum {
+
+    /** Use regular <code>String.hashCode()</code>. */
+    JAVA_HASH(
+            "java-hash",
+            text(
+                    "This hash would use %s to calculate the message key string's hash code.",
+                    code("String.hashCode()"))) {
+        @Override
+        public Hash getHash() {
+            return JavaStringHash.getInstance();
+        }
+    },
+    /**
+     * Use Murmur3 hashing function. <a
+     * href="https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash</a>
+     */
+    MURMUR3_32_HASH(
+            "murmur-3-32-hash",
+            text(
+                    "This hash would calculate message key's hash code by using %s algorithm.",
+                    link("https://en.wikipedia.org/wiki/MurmurHash", "Murmur3"))) {
+        @Override
+        public Hash getHash() {
+            return Murmur3_32Hash.getInstance();
+        }
+    };
+
+    private final String name;
+    private final InlineElement desc;
+
+    MessageKeyHash(String name, InlineElement desc) {
+        this.name = name;
+        this.desc = desc;
+    }
+
+    @Internal
+    public abstract Hash getHash();
+
+    @Override
+    public String toString() {
+        return name;
+    }
+
+    @Internal
+    @Override
+    public InlineElement getDescription() {
+        return desc;
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/RoundRobinTopicRouter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/RoundRobinTopicRouter.java
new file mode 100644
index 0000000..b9c654a
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/RoundRobinTopicRouter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.router;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * If you choose the {@link TopicRoutingMode#ROUND_ROBIN} policy, we would use this implementation.
+ * We would pick the topic one by one in a fixed batch size.
+ *
+ * @param <IN> The message type which should write to Pulsar.
+ */
+@Internal
+public class RoundRobinTopicRouter<IN> implements TopicRouter<IN> {
+    private static final long serialVersionUID = -1160533263474038206L;
+
+    /** The internal counter for counting the messages. */
+    private final AtomicLong counter = new AtomicLong(0);
+
+    /** The size when we switch to another topic. */
+    private final int partitionSwitchSize;
+
+    public RoundRobinTopicRouter(SinkConfiguration configuration) {
+        this.partitionSwitchSize = configuration.getPartitionSwitchSize();
+    }
+
+    @Override
+    public String route(IN in, String key, List<String> partitions, PulsarSinkContext context) {
+        checkArgument(
+                !partitions.isEmpty(),
+                "You should provide topics for routing topic by message key hash.");
+
+        long counts = counter.getAndAdd(1);
+        long index = (counts / partitionSwitchSize) % partitions.size();
+        // Avoid digit overflow for message counter.
+        int topicIndex = (int) (Math.abs(index) % Integer.MAX_VALUE);
+
+        return partitions.get(topicIndex);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRouter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRouter.java
new file mode 100644
index 0000000..a2c0589
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRouter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.router;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkBuilder;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * The router for choosing the desired topic to write the Flink records. The user can implement this
+ * router for complex requirements. We have provided some easy-to-use implementations.
+ *
+ * <p>This topic router is stateless and doesn't have any initialize logic. Make sure you don't
+ * require some dynamic state.
+ *
+ * @param <IN> The record type needs to be written to Pulsar.
+ */
+@PublicEvolving
+public interface TopicRouter<IN> extends Serializable {
+
+    /**
+     * Choose the topic by given record & available partition list. You can return a new topic name
+     * if you need it.
+     *
+     * @param in The record instance which need to be written to Pulsar.
+     * @param key The key of the message from {@link PulsarMessageBuilder#key(String)}. It could be
+     *     null, if message doesn't have a key.
+     * @param partitions The available partition list. This could be empty if you don't provide any
+     *     topics in {@link PulsarSinkBuilder#setTopics(String...)}. You can return a custom topic,
+     *     but make sure it should contain a partition index in naming. Using {@link
+     *     TopicNameUtils#topicNameWithPartition(String, int)} can easily create a topic name with
+     *     partition index.
+     * @param context The context contains useful information for determining the topic.
+     * @return The topic name to use.
+     */
+    String route(IN in, String key, List<String> partitions, PulsarSinkContext context);
+
+    /** Implement this method if you have some non-serializable field. */
+    default void open(SinkConfiguration sinkConfiguration) {
+        // Nothing to do by default.
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRoutingMode.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRoutingMode.java
new file mode 100644
index 0000000..c327435
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRoutingMode.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.router;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
+
+/** The routing policy for choosing the desired topic by the given message. */
+@PublicEvolving
+public enum TopicRoutingMode implements DescribedEnum {
+
+    /**
+     * The producer will publish messages across all partitions in a round-robin fashion to achieve
+     * maximum throughput. Please note that round-robin is not done per individual message but
+     * rather it's set to the same boundary of batching delay, to ensure batching is effective.
+     */
+    ROUND_ROBIN(
+            "round-robin",
+            text(
+                    "The producer will publish messages across all partitions in a round-robin fashion to achieve maximum throughput."
+                            + " Please note that round-robin is not done per individual message"
+                            + " but rather it's set to the same boundary of %s, to ensure batching is effective.",
+                    code(PULSAR_BATCHING_MAX_MESSAGES.key()))),
+
+    /**
+     * If no key is provided, The partitioned producer will randomly pick one single topic partition
+     * and publish all the messages into that partition. If a key is provided on the message, the
+     * partitioned producer will hash the key and assign the message to a particular partition.
+     */
+    MESSAGE_KEY_HASH(
+            "message-key-hash",
+            text(
+                    "If no key is provided, The partitioned producer will randomly pick one single topic partition"
+                            + " and publish all the messages into that partition. If a key is provided on the message,"
+                            + " the partitioned producer will hash the key and assign the message to a particular partition.")),
+
+    /**
+     * Use custom topic router implementation that will be called to determine the partition for a
+     * particular message.
+     */
+    CUSTOM(
+            "custom",
+            text(
+                    "Use custom %s implementation that will be called to determine the partition for a particular message.",
+                    code(TopicRouter.class.getSimpleName())));
+
+    private final String name;
+    private final InlineElement desc;
+
+    TopicRoutingMode(String name, InlineElement desc) {
+        this.name = name;
+        this.desc = desc;
+    }
+
+    @Internal
+    @Override
+    public InlineElement getDescription() {
+        return desc;
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java
new file mode 100644
index 0000000..acd1c61
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.topic;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Objects;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartitioned;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
+
+/**
+ * We need the latest topic metadata for making sure the newly created topic partitions would be
+ * used by the Pulsar sink. This routing policy would be different compared with Pulsar Client
+ * built-in logic. We use Flink's ProcessingTimer as the executor.
+ */
+@Internal
+public class TopicMetadataListener implements Serializable, Closeable {
+    private static final long serialVersionUID = 6186948471557507522L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(TopicMetadataListener.class);
+
+    private final ImmutableList<String> partitionedTopics;
+    private final Map<String, Integer> topicMetadata;
+    private volatile ImmutableList<String> availableTopics;
+
+    // Dynamic fields.
+    private transient PulsarAdmin pulsarAdmin;
+    private transient Long topicMetadataRefreshInterval;
+    private transient ProcessingTimeService timeService;
+
+    public TopicMetadataListener() {
+        this(emptyList());
+    }
+
+    public TopicMetadataListener(List<String> topics) {
+        List<String> partitions = new ArrayList<>(topics.size());
+        Map<String, Integer> metadata = new HashMap<>(topics.size());
+        for (String topic : topics) {
+            if (isPartitioned(topic)) {
+                partitions.add(topic);
+            } else {
+                // This would be updated when open writing.
+                metadata.put(topic, -1);
+            }
+        }
+
+        this.partitionedTopics = ImmutableList.copyOf(partitions);
+        this.topicMetadata = metadata;
+        this.availableTopics = ImmutableList.of();
+    }
+
+    /** Register the topic metadata update in process time service. */
+    public void open(SinkConfiguration sinkConfiguration, ProcessingTimeService timeService) {
+        if (topicMetadata.isEmpty()) {
+            LOG.info("No topics have been provided, skip listener initialize.");
+            return;
+        }
+
+        // Initialize listener properties.
+        this.pulsarAdmin = createAdmin(sinkConfiguration);
+        this.topicMetadataRefreshInterval = sinkConfiguration.getTopicMetadataRefreshInterval();
+        this.timeService = timeService;
+
+        // Initialize the topic metadata. Quit if fail to connect to Pulsar.
+        sneakyAdmin(this::updateTopicMetadata);
+
+        // Register time service.
+        triggerNextTopicMetadataUpdate(true);
+    }
+
+    /**
+     * Return all the available topic partitions. We would recalculate the partitions if the topic
+     * metadata has been changed. Otherwise, we would return the cached result for better
+     * performance.
+     */
+    public List<String> availableTopics() {
+        if (availableTopics.isEmpty()
+                && (!partitionedTopics.isEmpty() || !topicMetadata.isEmpty())) {
+            List<String> results = new ArrayList<>();
+            for (Map.Entry<String, Integer> entry : topicMetadata.entrySet()) {
+                for (int i = 0; i < entry.getValue(); i++) {
+                    results.add(topicNameWithPartition(entry.getKey(), i));
+                }
+            }
+
+            results.addAll(partitionedTopics);
+            this.availableTopics = ImmutableList.copyOf(results);
+        }
+
+        return availableTopics;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (pulsarAdmin != null) {
+            pulsarAdmin.close();
+        }
+    }
+
+    private void triggerNextTopicMetadataUpdate(boolean initial) {
+        if (!initial) {
+            // We should update the topic metadata, ignore the pulsar admin exception.
+            try {
+                updateTopicMetadata();
+            } catch (PulsarAdminException e) {
+                LOG.warn("", e);
+            }
+        }
+
+        // Register next timer.
+        long currentProcessingTime = timeService.getCurrentProcessingTime();
+        long triggerTime = currentProcessingTime + topicMetadataRefreshInterval;
+        timeService.registerTimer(triggerTime, time -> triggerNextTopicMetadataUpdate(false));
+    }
+
+    private void updateTopicMetadata() throws PulsarAdminException {
+        boolean shouldUpdate = false;
+
+        for (Map.Entry<String, Integer> entry : topicMetadata.entrySet()) {
+            String topic = entry.getKey();
+            PartitionedTopicMetadata metadata =
+                    pulsarAdmin.topics().getPartitionedTopicMetadata(topic);
+
+            // Update topic metadata if it has been changed.
+            if (!Objects.equal(entry.getValue(), metadata.partitions)) {
+                entry.setValue(metadata.partitions);
+                shouldUpdate = true;
+            }
+        }
+
+        // Clear available topics if the topic metadata has been changed.
+        if (shouldUpdate) {
+            this.availableTopics = ImmutableList.of();
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
new file mode 100644
index 0000000..9bb1753
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.topic;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction;
+import static org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.createProducerBuilder;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * All the Pulsar Producers share the same Client, but self hold the queue for a specified topic. So
+ * we have to create different instances for different topics.
+ */
+@Internal
+public class TopicProducerRegister implements Closeable {
+
+    private final PulsarClient pulsarClient;
+    private final SinkConfiguration sinkConfiguration;
+    private final Map<String, Map<SchemaInfo, Producer<?>>> producerRegister;
+    private final Map<String, Transaction> transactionRegister;
+
+    public TopicProducerRegister(SinkConfiguration sinkConfiguration) {
+        this.pulsarClient = createClient(sinkConfiguration);
+        this.sinkConfiguration = sinkConfiguration;
+        this.producerRegister = new HashMap<>();
+        this.transactionRegister = new HashMap<>();
+    }
+
+    /**
+     * Create a TypedMessageBuilder which could be sent to Pulsar directly. First, we would create a
+     * topic-related producer or use a cached instead. Then we would try to find a topic-related
+     * transaction. We would generate a transaction instance if there is no transaction. Finally, we
+     * create the message builder and put the element into it.
+     */
+    public <T> TypedMessageBuilder<T> createMessageBuilder(String topic, Schema<T> schema) {
+        Producer<T> producer = getOrCreateProducer(topic, schema);
+        DeliveryGuarantee deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
+
+        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+            Transaction transaction = getOrCreateTransaction(topic);
+            return producer.newMessage(transaction);
+        } else {
+            return producer.newMessage();
+        }
+    }
+
+    /**
+     * Convert the transactions into a committable list for Pulsar Committer. The transactions would
+     * be removed until Flink triggered a checkpoint.
+     */
+    public List<PulsarCommittable> prepareCommit() {
+        List<PulsarCommittable> committables = new ArrayList<>(transactionRegister.size());
+        transactionRegister.forEach(
+                (topic, transaction) -> {
+                    TxnID txnID = transaction.getTxnID();
+                    PulsarCommittable committable = new PulsarCommittable(txnID, topic);
+                    committables.add(committable);
+                });
+
+        clearTransactions();
+        return committables;
+    }
+
+    /**
+     * Flush all the messages buffered in the client and wait until all messages have been
+     * successfully persisted.
+     */
+    public void flush() throws IOException {
+        Collection<Map<SchemaInfo, Producer<?>>> collection = producerRegister.values();
+        for (Map<SchemaInfo, Producer<?>> producers : collection) {
+            for (Producer<?> producer : producers.values()) {
+                producer.flush();
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try (Closer closer = Closer.create()) {
+            // Flush all the pending messages to Pulsar. This wouldn't cause exception.
+            closer.register(this::flush);
+
+            // Abort all the existed transactions.
+            closer.register(this::abortTransactions);
+
+            // Remove all the producers.
+            closer.register(producerRegister::clear);
+
+            // All the producers would be closed by this method.
+            // We would block until all the producers have been successfully closed.
+            closer.register(pulsarClient);
+        }
+    }
+
+    /** Create or return the cached topic-related producer. */
+    @SuppressWarnings("unchecked")
+    private <T> Producer<T> getOrCreateProducer(String topic, Schema<T> schema) {
+        Map<SchemaInfo, Producer<?>> producers =
+                producerRegister.computeIfAbsent(topic, key -> new HashMap<>());
+        SchemaInfo schemaInfo = schema.getSchemaInfo();
+
+        if (producers.containsKey(schemaInfo)) {
+            return (Producer<T>) producers.get(schemaInfo);
+        } else {
+            ProducerBuilder<T> builder =
+                    createProducerBuilder(pulsarClient, schema, sinkConfiguration);
+            // Set the required topic name.
+            builder.topic(topic);
+            Producer<T> producer = sneakyClient(builder::create);
+            producers.put(schemaInfo, producer);
+
+            return producer;
+        }
+    }
+
+    /**
+     * Get the cached topic-related transaction. Or create a new transaction after checkpointing.
+     */
+    private Transaction getOrCreateTransaction(String topic) {
+        return transactionRegister.computeIfAbsent(
+                topic,
+                t -> {
+                    long timeoutMillis = sinkConfiguration.getTransactionTimeoutMillis();
+                    return createTransaction(pulsarClient, timeoutMillis);
+                });
+    }
+
+    /** Abort the existed transactions. This method would be used when closing PulsarWriter. */
+    private void abortTransactions() {
+        if (transactionRegister.isEmpty()) {
+            return;
+        }
+
+        TransactionCoordinatorClient coordinatorClient =
+                ((PulsarClientImpl) pulsarClient).getTcClient();
+        // This null check is used for making sure transaction is enabled in client.
+        checkNotNull(coordinatorClient);
+
+        try (Closer closer = Closer.create()) {
+            for (Transaction transaction : transactionRegister.values()) {
+                TxnID txnID = transaction.getTxnID();
+                closer.register(() -> coordinatorClient.abort(txnID));
+            }
+
+            clearTransactions();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    /**
+     * Clean these transactions. All transactions should be passed to Pulsar committer, we would
+     * create new transaction when new message comes.
+     */
+    private void clearTransactions() {
+        transactionRegister.clear();
+    }
+}