You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/16 09:41:06 UTC

[flink] 04/09: [FLINK-26023][connector/pulsar] Create a Pulsar sink config model for matching ProducerConfigurationData.

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9bc8b0f37bec419bcdc4b8cdee3abf5320df5399
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Wed Feb 9 14:56:54 2022 +0800

    [FLINK-26023][connector/pulsar] Create a Pulsar sink config model for matching ProducerConfigurationData.
---
 .../connector/pulsar/sink/PulsarSinkOptions.java   | 259 +++++++++++++++++++++
 .../pulsar/sink/config/PulsarSinkConfigUtils.java  | 112 +++++++++
 .../pulsar/sink/config/SinkConfiguration.java      | 147 ++++++++++++
 3 files changed, 518 insertions(+)

diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
new file mode 100644
index 0000000..0e16830
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+
+import org.apache.pulsar.client.api.CompressionType;
+
+import java.time.Duration;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.configuration.description.LinkElement.link;
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PRODUCER_CONFIG_PREFIX;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.SINK_CONFIG_PREFIX;
+import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES;
+import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES;
+import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
+
+/**
+ * Configurations for PulsarSink. All the options list here could be configured in {@code
+ * PulsarSinkBuilder#setConfig(ConfigOption, Object)}. The {@link PulsarOptions} is also required
+ * for pulsar source.
+ *
+ * @see PulsarOptions for shared configure options.
+ */
+@PublicEvolving
+@ConfigGroups(
+        groups = {
+            @ConfigGroup(name = "PulsarSink", keyPrefix = SINK_CONFIG_PREFIX),
+            @ConfigGroup(name = "PulsarProducer", keyPrefix = PRODUCER_CONFIG_PREFIX)
+        })
+public final class PulsarSinkOptions {
+
+    // Pulsar sink connector config prefix.
+    public static final String SINK_CONFIG_PREFIX = "pulsar.sink.";
+    // Pulsar producer API config prefix.
+    public static final String PRODUCER_CONFIG_PREFIX = "pulsar.producer.";
+
+    private PulsarSinkOptions() {
+        // This is a constant class
+    }
+
+    ///////////////////////////////////////////////////////////////////////////////
+    //
+    // The configuration for pulsar sink part.
+    // All the configuration listed below should have the pulsar.sink prefix.
+    //
+    ///////////////////////////////////////////////////////////////////////////////
+
+    public static final ConfigOption<DeliveryGuarantee> PULSAR_WRITE_DELIVERY_GUARANTEE =
+            ConfigOptions.key(SINK_CONFIG_PREFIX + "deliveryGuarantee")
+                    .enumType(DeliveryGuarantee.class)
+                    .defaultValue(DeliveryGuarantee.NONE)
+                    .withDescription("Optional delivery guarantee when committing.");
+
+    public static final ConfigOption<Long> PULSAR_WRITE_TRANSACTION_TIMEOUT =
+            ConfigOptions.key(SINK_CONFIG_PREFIX + "transactionTimeoutMillis")
+                    .longType()
+                    .defaultValue(Duration.ofHours(3).toMillis())
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "This option is used when the user require the %s semantic.",
+                                            code("DeliveryGuarantee.EXACTLY_ONCE"))
+                                    .text(
+                                            "We would use transaction for making sure the message could be write only once.")
+                                    .build());
+
+    public static final ConfigOption<Long> PULSAR_TOPIC_METADATA_REFRESH_INTERVAL =
+            ConfigOptions.key(SINK_CONFIG_PREFIX + "topicMetadataRefreshInterval")
+                    .longType()
+                    .defaultValue(Duration.ofMinutes(30).toMillis())
+                    .withDescription(
+                            "Auto update the topic metadata in a fixed interval (in ms). The default value is 30 minutes.");
+
+    public static final ConfigOption<Boolean> PULSAR_WRITE_SCHEMA_EVOLUTION =
+            ConfigOptions.key(SINK_CONFIG_PREFIX + "enableSchemaEvolution")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "If you enable this option, we would consume and deserialize the message by using Pulsar's %s.",
+                                            code("Schema"))
+                                    .build());
+
+    public static final ConfigOption<Integer> PULSAR_MAX_RECOMMIT_TIMES =
+            ConfigOptions.key(SINK_CONFIG_PREFIX + "maxRecommitTimes")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The allowed transaction recommit times if we meet some retryable exception."
+                                    + " This is used in Pulsar Transaction.");
+
+    ///////////////////////////////////////////////////////////////////////////////
+    //
+    // The configuration for ProducerConfigurationData part.
+    // All the configuration listed below should have the pulsar.producer prefix.
+    //
+    ///////////////////////////////////////////////////////////////////////////////
+
+    public static final ConfigOption<String> PULSAR_PRODUCER_NAME =
+            ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "producerName")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "A producer name which would be displayed in the Pulsar's dashboard."
+                                    + " If no producer name was provided, we would use a Pulsar generated name instead.");
+
+    public static final ConfigOption<Long> PULSAR_SEND_TIMEOUT_MS =
+            ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "sendTimeoutMs")
+                    .longType()
+                    .defaultValue(30000L)
+                    .withDescription(
+                            Description.builder()
+                                    .text("Message send timeout in ms.")
+                                    .text(
+                                            "If a message is not acknowledged by a server before the %s expires, an error occurs.",
+                                            code("sendTimeout"))
+                                    .build());
+
+    public static final ConfigOption<Integer> PULSAR_MAX_PENDING_MESSAGES =
+            ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "maxPendingMessages")
+                    .intType()
+                    .defaultValue(DEFAULT_MAX_PENDING_MESSAGES)
+                    .withDescription(
+                            Description.builder()
+                                    .text("The maximum size of a queue holding pending messages.")
+                                    .linebreak()
+                                    .text(
+                                            "For example, a message waiting to receive an acknowledgment from a %s.",
+                                            link(
+                                                    "broker",
+                                                    "https://pulsar.apache.org/docs/en/reference-terminology#broker"))
+                                    .linebreak()
+                                    .text(
+                                            "By default, when the queue is full, all calls to the %s and %s methods fail unless you set %s to true.",
+                                            code("Send"),
+                                            code("SendAsync"),
+                                            code("BlockIfQueueFull"))
+                                    .build());
+
+    public static final ConfigOption<Integer> PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS =
+            ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "maxPendingMessagesAcrossPartitions")
+                    .intType()
+                    .defaultValue(DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The maximum number of pending messages across partitions.")
+                                    .linebreak()
+                                    .text(
+                                            "Use the setting to lower the max pending messages for each partition (%s) if the total number exceeds the configured value.",
+                                            code("setMaxPendingMessages"))
+                                    .build());
+
+    public static final ConfigOption<Long> PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS =
+            ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "batchingMaxPublishDelayMicros")
+                    .longType()
+                    .defaultValue(MILLISECONDS.toMicros(1))
+                    .withDescription("Batching time period of sending messages.");
+
+    public static final ConfigOption<Integer>
+            PULSAR_BATCHING_PARTITION_SWITCH_FREQUENCY_BY_PUBLISH_DELAY =
+                    ConfigOptions.key(
+                                    PRODUCER_CONFIG_PREFIX
+                                            + "batchingPartitionSwitchFrequencyByPublishDelay")
+                            .intType()
+                            .defaultValue(10)
+                            .withDescription(
+                                    "The maximum wait time for switching topic partitions.");
+
+    public static final ConfigOption<Integer> PULSAR_BATCHING_MAX_MESSAGES =
+            ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "batchingMaxMessages")
+                    .intType()
+                    .defaultValue(DEFAULT_BATCHING_MAX_MESSAGES)
+                    .withDescription("The maximum number of messages permitted in a batch.");
+
+    public static final ConfigOption<Integer> PULSAR_BATCHING_MAX_BYTES =
+            ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "batchingMaxBytes")
+                    .intType()
+                    .defaultValue(128 * 1024)
+                    .withDescription(
+                            "The maximum size of messages permitted in a batch. Keep the maximum consistent as previous versions.");
+
+    public static final ConfigOption<Boolean> PULSAR_BATCHING_ENABLED =
+            ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "batchingEnabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Enable batch send ability, it was enabled by default.");
+
+    public static final ConfigOption<Boolean> PULSAR_CHUNKING_ENABLED =
+            ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "chunkingEnabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("");
+
+    public static final ConfigOption<CompressionType> PULSAR_COMPRESSION_TYPE =
+            ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "compressionType")
+                    .enumType(CompressionType.class)
+                    .defaultValue(CompressionType.NONE)
+                    .withDescription(
+                            Description.builder()
+                                    .text("Message data compression type used by a producer.")
+                                    .text("Available options:")
+                                    .list(
+                                            link("LZ4", "https://github.com/lz4/lz4"),
+                                            link("ZLIB", "https://zlib.net/"),
+                                            link("ZSTD", "https://facebook.github.io/zstd/"),
+                                            link("SNAPPY", "https://google.github.io/snappy/"))
+                                    .build());
+
+    public static final ConfigOption<Long> PULSAR_INITIAL_SEQUENCE_ID =
+            ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "initialSequenceId")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The sequence id for avoiding the duplication, it's used when Pulsar doesn't have transaction.");
+
+    public static final ConfigOption<Map<String, String>> PULSAR_PRODUCER_PROPERTIES =
+            ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "properties")
+                    .mapType()
+                    .defaultValue(emptyMap())
+                    .withDescription(
+                            Description.builder()
+                                    .text("A name or value property of this consumer.")
+                                    .text(
+                                            " %s is application defined metadata attached to a consumer.",
+                                            code("properties"))
+                                    .text(
+                                            " When getting a topic stats, associate this metadata with the consumer stats for easier identification.")
+                                    .build());
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
new file mode 100644
index 0000000..13821fe
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.config;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigValidator;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.Map;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_ENABLED;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_BYTES;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_PARTITION_SWITCH_FREQUENCY_BY_PUBLISH_DELAY;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_CHUNKING_ENABLED;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_COMPRESSION_TYPE;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_INITIAL_SEQUENCE_ID;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_PROPERTIES;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS;
+import static org.apache.pulsar.client.api.MessageRoutingMode.SinglePartition;
+import static org.apache.pulsar.client.api.ProducerAccessMode.Shared;
+
+/** Create the {@link Producer} to send message and a validator for building sink config. */
+@Internal
+public final class PulsarSinkConfigUtils {
+
+    private PulsarSinkConfigUtils() {
+        // No need to create instance.
+    }
+
+    public static final PulsarConfigValidator SINK_CONFIG_VALIDATOR =
+            PulsarConfigValidator.builder()
+                    .requiredOption(PULSAR_SERVICE_URL)
+                    .requiredOption(PULSAR_ADMIN_URL)
+                    .conflictOptions(PULSAR_AUTH_PARAMS, PULSAR_AUTH_PARAM_MAP)
+                    .build();
+
+    /** Create a pulsar producer builder by using the given Configuration. */
+    public static <T> ProducerBuilder<T> createProducerBuilder(
+            PulsarClient client, Schema<T> schema, SinkConfiguration configuration) {
+        ProducerBuilder<T> builder = client.newProducer(schema);
+
+        configuration.useOption(PULSAR_PRODUCER_NAME, builder::producerName);
+        configuration.useOption(
+                PULSAR_SEND_TIMEOUT_MS,
+                Math::toIntExact,
+                ms -> builder.sendTimeout(ms, MILLISECONDS));
+        configuration.useOption(PULSAR_MAX_PENDING_MESSAGES, builder::maxPendingMessages);
+        configuration.useOption(
+                PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS,
+                builder::maxPendingMessagesAcrossPartitions);
+        configuration.useOption(
+                PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS,
+                s -> builder.batchingMaxPublishDelay(s, MICROSECONDS));
+        configuration.useOption(
+                PULSAR_BATCHING_PARTITION_SWITCH_FREQUENCY_BY_PUBLISH_DELAY,
+                builder::roundRobinRouterBatchingPartitionSwitchFrequency);
+        configuration.useOption(PULSAR_BATCHING_MAX_MESSAGES, builder::batchingMaxMessages);
+        configuration.useOption(PULSAR_BATCHING_MAX_BYTES, builder::batchingMaxBytes);
+        configuration.useOption(PULSAR_BATCHING_ENABLED, builder::enableBatching);
+        configuration.useOption(PULSAR_CHUNKING_ENABLED, builder::enableChunking);
+        configuration.useOption(PULSAR_COMPRESSION_TYPE, builder::compressionType);
+        configuration.useOption(PULSAR_INITIAL_SEQUENCE_ID, builder::initialSequenceId);
+
+        // Set producer properties
+        Map<String, String> properties = configuration.getProperties(PULSAR_PRODUCER_PROPERTIES);
+        if (!properties.isEmpty()) {
+            builder.properties(properties);
+        }
+
+        // Set the default value for current producer builder.
+        // We use non-partitioned producer by default. This wouldn't be changed in the future.
+        builder.blockIfQueueFull(true)
+                .messageRoutingMode(SinglePartition)
+                .enableMultiSchema(false)
+                .autoUpdatePartitions(false)
+                .accessMode(Shared)
+                .enableLazyStartPartitionedProducers(false);
+
+        return builder;
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
new file mode 100644
index 0000000..e0ef7ff
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.Sink.InitContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
+
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.Objects;
+
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_RECOMMIT_TIMES;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_TRANSACTION_TIMEOUT;
+
+/** The configured class for pulsar sink. */
+@PublicEvolving
+public class SinkConfiguration extends PulsarConfiguration {
+    private static final long serialVersionUID = 4941360605051251153L;
+
+    private final DeliveryGuarantee deliveryGuarantee;
+    private final long transactionTimeoutMillis;
+    private final long topicMetadataRefreshInterval;
+    private final int partitionSwitchSize;
+    private final boolean enableSchemaEvolution;
+    private final int maxPendingMessages;
+    private final int maxRecommitTimes;
+
+    public SinkConfiguration(Configuration configuration) {
+        super(configuration);
+
+        this.deliveryGuarantee = get(PULSAR_WRITE_DELIVERY_GUARANTEE);
+        this.transactionTimeoutMillis = getLong(PULSAR_WRITE_TRANSACTION_TIMEOUT);
+        this.topicMetadataRefreshInterval = getLong(PULSAR_TOPIC_METADATA_REFRESH_INTERVAL);
+        this.partitionSwitchSize = getInteger(PULSAR_BATCHING_MAX_MESSAGES);
+        this.enableSchemaEvolution = get(PULSAR_WRITE_SCHEMA_EVOLUTION);
+        this.maxPendingMessages = get(PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS);
+        this.maxRecommitTimes = get(PULSAR_MAX_RECOMMIT_TIMES);
+    }
+
+    /** The delivery guarantee changes the behavior of {@code PulsarWriter}. */
+    public DeliveryGuarantee getDeliveryGuarantee() {
+        return deliveryGuarantee;
+    }
+
+    /**
+     * Pulsar's transactions have a timeout mechanism for the uncommitted transaction. We use
+     * transactions for making sure the message could be written only once. Since the checkpoint
+     * interval couldn't be acquired from {@link InitContext}, we have to expose this option. Make
+     * sure this value is greater than the checkpoint interval. Create a pulsar producer builder by
+     * using the given Configuration.
+     */
+    public long getTransactionTimeoutMillis() {
+        return transactionTimeoutMillis;
+    }
+
+    /**
+     * Auto-update the topic metadata in a fixed interval (in ms). The default value is 30 minutes.
+     */
+    public long getTopicMetadataRefreshInterval() {
+        return topicMetadataRefreshInterval;
+    }
+
+    /**
+     * Switch the partition to write when we have written the given size of messages. It's used for
+     * a round-robin topic router.
+     */
+    public int getPartitionSwitchSize() {
+        return partitionSwitchSize;
+    }
+
+    /**
+     * If we should serialize and send the message with a specified Pulsar {@link Schema} instead
+     * the default {@link Schema#BYTES}. This switch is only used for {@code PulsarSchemaWrapper}.
+     */
+    public boolean isEnableSchemaEvolution() {
+        return enableSchemaEvolution;
+    }
+
+    /**
+     * Pulsar message is sent asynchronously. Set this option for limiting the pending messages in a
+     * Pulsar writer instance.
+     */
+    public int getMaxPendingMessages() {
+        return maxPendingMessages;
+    }
+
+    /** The maximum allowed recommitting time for a Pulsar transaction. */
+    public int getMaxRecommitTimes() {
+        return maxRecommitTimes;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        SinkConfiguration that = (SinkConfiguration) o;
+        return transactionTimeoutMillis == that.transactionTimeoutMillis
+                && topicMetadataRefreshInterval == that.topicMetadataRefreshInterval
+                && partitionSwitchSize == that.partitionSwitchSize
+                && enableSchemaEvolution == that.enableSchemaEvolution
+                && maxPendingMessages == that.maxPendingMessages
+                && maxRecommitTimes == that.maxRecommitTimes;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                super.hashCode(),
+                transactionTimeoutMillis,
+                topicMetadataRefreshInterval,
+                partitionSwitchSize,
+                enableSchemaEvolution,
+                maxPendingMessages,
+                maxRecommitTimes);
+    }
+}