You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/16 09:41:06 UTC
[flink] 04/09: [FLINK-26023][connector/pulsar] Create a Pulsar sink config model for matching ProducerConfigurationData.
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9bc8b0f37bec419bcdc4b8cdee3abf5320df5399
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Wed Feb 9 14:56:54 2022 +0800
[FLINK-26023][connector/pulsar] Create a Pulsar sink config model for matching ProducerConfigurationData.
---
.../connector/pulsar/sink/PulsarSinkOptions.java | 259 +++++++++++++++++++++
.../pulsar/sink/config/PulsarSinkConfigUtils.java | 112 +++++++++
.../pulsar/sink/config/SinkConfiguration.java | 147 ++++++++++++
3 files changed, 518 insertions(+)
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
new file mode 100644
index 0000000..0e16830
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+
+import org.apache.pulsar.client.api.CompressionType;
+
+import java.time.Duration;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.configuration.description.LinkElement.link;
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PRODUCER_CONFIG_PREFIX;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.SINK_CONFIG_PREFIX;
+import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES;
+import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES;
+import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
+
+/**
+ * Configurations for PulsarSink. All the options list here could be configured in {@code
+ * PulsarSinkBuilder#setConfig(ConfigOption, Object)}. The {@link PulsarOptions} is also required
+ * for pulsar source.
+ *
+ * @see PulsarOptions for shared configure options.
+ */
+@PublicEvolving
+@ConfigGroups(
+ groups = {
+ @ConfigGroup(name = "PulsarSink", keyPrefix = SINK_CONFIG_PREFIX),
+ @ConfigGroup(name = "PulsarProducer", keyPrefix = PRODUCER_CONFIG_PREFIX)
+ })
+public final class PulsarSinkOptions {
+
+ // Pulsar sink connector config prefix.
+ public static final String SINK_CONFIG_PREFIX = "pulsar.sink.";
+ // Pulsar producer API config prefix.
+ public static final String PRODUCER_CONFIG_PREFIX = "pulsar.producer.";
+
+ private PulsarSinkOptions() {
+ // This is a constant class
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////
+ //
+ // The configuration for pulsar sink part.
+ // All the configuration listed below should have the pulsar.sink prefix.
+ //
+ ///////////////////////////////////////////////////////////////////////////////
+
+ public static final ConfigOption<DeliveryGuarantee> PULSAR_WRITE_DELIVERY_GUARANTEE =
+ ConfigOptions.key(SINK_CONFIG_PREFIX + "deliveryGuarantee")
+ .enumType(DeliveryGuarantee.class)
+ .defaultValue(DeliveryGuarantee.NONE)
+ .withDescription("Optional delivery guarantee when committing.");
+
+ public static final ConfigOption<Long> PULSAR_WRITE_TRANSACTION_TIMEOUT =
+ ConfigOptions.key(SINK_CONFIG_PREFIX + "transactionTimeoutMillis")
+ .longType()
+ .defaultValue(Duration.ofHours(3).toMillis())
+ .withDescription(
+ Description.builder()
+ .text(
+ "This option is used when the user require the %s semantic.",
+ code("DeliveryGuarantee.EXACTLY_ONCE"))
+ .text(
+ "We would use transaction for making sure the message could be write only once.")
+ .build());
+
+ public static final ConfigOption<Long> PULSAR_TOPIC_METADATA_REFRESH_INTERVAL =
+ ConfigOptions.key(SINK_CONFIG_PREFIX + "topicMetadataRefreshInterval")
+ .longType()
+ .defaultValue(Duration.ofMinutes(30).toMillis())
+ .withDescription(
+ "Auto update the topic metadata in a fixed interval (in ms). The default value is 30 minutes.");
+
+ public static final ConfigOption<Boolean> PULSAR_WRITE_SCHEMA_EVOLUTION =
+ ConfigOptions.key(SINK_CONFIG_PREFIX + "enableSchemaEvolution")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ Description.builder()
+ .text(
+ "If you enable this option, we would consume and deserialize the message by using Pulsar's %s.",
+ code("Schema"))
+ .build());
+
+ public static final ConfigOption<Integer> PULSAR_MAX_RECOMMIT_TIMES =
+ ConfigOptions.key(SINK_CONFIG_PREFIX + "maxRecommitTimes")
+ .intType()
+ .defaultValue(5)
+ .withDescription(
+ "The allowed transaction recommit times if we meet some retryable exception."
+ + " This is used in Pulsar Transaction.");
+
+ ///////////////////////////////////////////////////////////////////////////////
+ //
+ // The configuration for ProducerConfigurationData part.
+ // All the configuration listed below should have the pulsar.producer prefix.
+ //
+ ///////////////////////////////////////////////////////////////////////////////
+
+ public static final ConfigOption<String> PULSAR_PRODUCER_NAME =
+ ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "producerName")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "A producer name which would be displayed in the Pulsar's dashboard."
+ + " If no producer name was provided, we would use a Pulsar generated name instead.");
+
+ public static final ConfigOption<Long> PULSAR_SEND_TIMEOUT_MS =
+ ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "sendTimeoutMs")
+ .longType()
+ .defaultValue(30000L)
+ .withDescription(
+ Description.builder()
+ .text("Message send timeout in ms.")
+ .text(
+ "If a message is not acknowledged by a server before the %s expires, an error occurs.",
+ code("sendTimeout"))
+ .build());
+
+ public static final ConfigOption<Integer> PULSAR_MAX_PENDING_MESSAGES =
+ ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "maxPendingMessages")
+ .intType()
+ .defaultValue(DEFAULT_MAX_PENDING_MESSAGES)
+ .withDescription(
+ Description.builder()
+ .text("The maximum size of a queue holding pending messages.")
+ .linebreak()
+ .text(
+ "For example, a message waiting to receive an acknowledgment from a %s.",
+ link(
+ "broker",
+ "https://pulsar.apache.org/docs/en/reference-terminology#broker"))
+ .linebreak()
+ .text(
+ "By default, when the queue is full, all calls to the %s and %s methods fail unless you set %s to true.",
+ code("Send"),
+ code("SendAsync"),
+ code("BlockIfQueueFull"))
+ .build());
+
+ public static final ConfigOption<Integer> PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS =
+ ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "maxPendingMessagesAcrossPartitions")
+ .intType()
+ .defaultValue(DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)
+ .withDescription(
+ Description.builder()
+ .text(
+ "The maximum number of pending messages across partitions.")
+ .linebreak()
+ .text(
+ "Use the setting to lower the max pending messages for each partition (%s) if the total number exceeds the configured value.",
+ code("setMaxPendingMessages"))
+ .build());
+
+ public static final ConfigOption<Long> PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS =
+ ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "batchingMaxPublishDelayMicros")
+ .longType()
+ .defaultValue(MILLISECONDS.toMicros(1))
+ .withDescription("Batching time period of sending messages.");
+
+ public static final ConfigOption<Integer>
+ PULSAR_BATCHING_PARTITION_SWITCH_FREQUENCY_BY_PUBLISH_DELAY =
+ ConfigOptions.key(
+ PRODUCER_CONFIG_PREFIX
+ + "batchingPartitionSwitchFrequencyByPublishDelay")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "The maximum wait time for switching topic partitions.");
+
+ public static final ConfigOption<Integer> PULSAR_BATCHING_MAX_MESSAGES =
+ ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "batchingMaxMessages")
+ .intType()
+ .defaultValue(DEFAULT_BATCHING_MAX_MESSAGES)
+ .withDescription("The maximum number of messages permitted in a batch.");
+
+ public static final ConfigOption<Integer> PULSAR_BATCHING_MAX_BYTES =
+ ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "batchingMaxBytes")
+ .intType()
+ .defaultValue(128 * 1024)
+ .withDescription(
+ "The maximum size of messages permitted in a batch. Keep the maximum consistent as previous versions.");
+
+ public static final ConfigOption<Boolean> PULSAR_BATCHING_ENABLED =
+ ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "batchingEnabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Enable batch send ability, it was enabled by default.");
+
+ public static final ConfigOption<Boolean> PULSAR_CHUNKING_ENABLED =
+ ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "chunkingEnabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("");
+
+ public static final ConfigOption<CompressionType> PULSAR_COMPRESSION_TYPE =
+ ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "compressionType")
+ .enumType(CompressionType.class)
+ .defaultValue(CompressionType.NONE)
+ .withDescription(
+ Description.builder()
+ .text("Message data compression type used by a producer.")
+ .text("Available options:")
+ .list(
+ link("LZ4", "https://github.com/lz4/lz4"),
+ link("ZLIB", "https://zlib.net/"),
+ link("ZSTD", "https://facebook.github.io/zstd/"),
+ link("SNAPPY", "https://google.github.io/snappy/"))
+ .build());
+
+ public static final ConfigOption<Long> PULSAR_INITIAL_SEQUENCE_ID =
+ ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "initialSequenceId")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "The sequence id for avoiding the duplication, it's used when Pulsar doesn't have transaction.");
+
+ public static final ConfigOption<Map<String, String>> PULSAR_PRODUCER_PROPERTIES =
+ ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "properties")
+ .mapType()
+ .defaultValue(emptyMap())
+ .withDescription(
+ Description.builder()
+ .text("A name or value property of this consumer.")
+ .text(
+ " %s is application defined metadata attached to a consumer.",
+ code("properties"))
+ .text(
+ " When getting a topic stats, associate this metadata with the consumer stats for easier identification.")
+ .build());
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
new file mode 100644
index 0000000..13821fe
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.config;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigValidator;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.Map;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_ENABLED;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_BYTES;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_PARTITION_SWITCH_FREQUENCY_BY_PUBLISH_DELAY;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_CHUNKING_ENABLED;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_COMPRESSION_TYPE;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_INITIAL_SEQUENCE_ID;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_PROPERTIES;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS;
+import static org.apache.pulsar.client.api.MessageRoutingMode.SinglePartition;
+import static org.apache.pulsar.client.api.ProducerAccessMode.Shared;
+
+/** Create the {@link Producer} to send message and a validator for building sink config. */
+@Internal
+public final class PulsarSinkConfigUtils {
+
+ private PulsarSinkConfigUtils() {
+ // No need to create instance.
+ }
+
+ public static final PulsarConfigValidator SINK_CONFIG_VALIDATOR =
+ PulsarConfigValidator.builder()
+ .requiredOption(PULSAR_SERVICE_URL)
+ .requiredOption(PULSAR_ADMIN_URL)
+ .conflictOptions(PULSAR_AUTH_PARAMS, PULSAR_AUTH_PARAM_MAP)
+ .build();
+
+ /** Create a pulsar producer builder by using the given Configuration. */
+ public static <T> ProducerBuilder<T> createProducerBuilder(
+ PulsarClient client, Schema<T> schema, SinkConfiguration configuration) {
+ ProducerBuilder<T> builder = client.newProducer(schema);
+
+ configuration.useOption(PULSAR_PRODUCER_NAME, builder::producerName);
+ configuration.useOption(
+ PULSAR_SEND_TIMEOUT_MS,
+ Math::toIntExact,
+ ms -> builder.sendTimeout(ms, MILLISECONDS));
+ configuration.useOption(PULSAR_MAX_PENDING_MESSAGES, builder::maxPendingMessages);
+ configuration.useOption(
+ PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS,
+ builder::maxPendingMessagesAcrossPartitions);
+ configuration.useOption(
+ PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS,
+ s -> builder.batchingMaxPublishDelay(s, MICROSECONDS));
+ configuration.useOption(
+ PULSAR_BATCHING_PARTITION_SWITCH_FREQUENCY_BY_PUBLISH_DELAY,
+ builder::roundRobinRouterBatchingPartitionSwitchFrequency);
+ configuration.useOption(PULSAR_BATCHING_MAX_MESSAGES, builder::batchingMaxMessages);
+ configuration.useOption(PULSAR_BATCHING_MAX_BYTES, builder::batchingMaxBytes);
+ configuration.useOption(PULSAR_BATCHING_ENABLED, builder::enableBatching);
+ configuration.useOption(PULSAR_CHUNKING_ENABLED, builder::enableChunking);
+ configuration.useOption(PULSAR_COMPRESSION_TYPE, builder::compressionType);
+ configuration.useOption(PULSAR_INITIAL_SEQUENCE_ID, builder::initialSequenceId);
+
+ // Set producer properties
+ Map<String, String> properties = configuration.getProperties(PULSAR_PRODUCER_PROPERTIES);
+ if (!properties.isEmpty()) {
+ builder.properties(properties);
+ }
+
+ // Set the default value for current producer builder.
+ // We use non-partitioned producer by default. This wouldn't be changed in the future.
+ builder.blockIfQueueFull(true)
+ .messageRoutingMode(SinglePartition)
+ .enableMultiSchema(false)
+ .autoUpdatePartitions(false)
+ .accessMode(Shared)
+ .enableLazyStartPartitionedProducers(false);
+
+ return builder;
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
new file mode 100644
index 0000000..e0ef7ff
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.Sink.InitContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
+
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.Objects;
+
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_RECOMMIT_TIMES;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_TRANSACTION_TIMEOUT;
+
+/** The configured class for pulsar sink. */
+@PublicEvolving
+public class SinkConfiguration extends PulsarConfiguration {
+ private static final long serialVersionUID = 4941360605051251153L;
+
+ private final DeliveryGuarantee deliveryGuarantee;
+ private final long transactionTimeoutMillis;
+ private final long topicMetadataRefreshInterval;
+ private final int partitionSwitchSize;
+ private final boolean enableSchemaEvolution;
+ private final int maxPendingMessages;
+ private final int maxRecommitTimes;
+
+ public SinkConfiguration(Configuration configuration) {
+ super(configuration);
+
+ this.deliveryGuarantee = get(PULSAR_WRITE_DELIVERY_GUARANTEE);
+ this.transactionTimeoutMillis = getLong(PULSAR_WRITE_TRANSACTION_TIMEOUT);
+ this.topicMetadataRefreshInterval = getLong(PULSAR_TOPIC_METADATA_REFRESH_INTERVAL);
+ this.partitionSwitchSize = getInteger(PULSAR_BATCHING_MAX_MESSAGES);
+ this.enableSchemaEvolution = get(PULSAR_WRITE_SCHEMA_EVOLUTION);
+ this.maxPendingMessages = get(PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS);
+ this.maxRecommitTimes = get(PULSAR_MAX_RECOMMIT_TIMES);
+ }
+
+ /** The delivery guarantee changes the behavior of {@code PulsarWriter}. */
+ public DeliveryGuarantee getDeliveryGuarantee() {
+ return deliveryGuarantee;
+ }
+
+ /**
+ * Pulsar's transactions have a timeout mechanism for the uncommitted transaction. We use
+ * transactions for making sure the message could be written only once. Since the checkpoint
+ * interval couldn't be acquired from {@link InitContext}, we have to expose this option. Make
+ * sure this value is greater than the checkpoint interval. Create a pulsar producer builder by
+ * using the given Configuration.
+ */
+ public long getTransactionTimeoutMillis() {
+ return transactionTimeoutMillis;
+ }
+
+ /**
+ * Auto-update the topic metadata in a fixed interval (in ms). The default value is 30 minutes.
+ */
+ public long getTopicMetadataRefreshInterval() {
+ return topicMetadataRefreshInterval;
+ }
+
+ /**
+ * Switch the partition to write when we have written the given size of messages. It's used for
+ * a round-robin topic router.
+ */
+ public int getPartitionSwitchSize() {
+ return partitionSwitchSize;
+ }
+
+ /**
+ * If we should serialize and send the message with a specified Pulsar {@link Schema} instead
+ * the default {@link Schema#BYTES}. This switch is only used for {@code PulsarSchemaWrapper}.
+ */
+ public boolean isEnableSchemaEvolution() {
+ return enableSchemaEvolution;
+ }
+
+ /**
+ * Pulsar message is sent asynchronously. Set this option for limiting the pending messages in a
+ * Pulsar writer instance.
+ */
+ public int getMaxPendingMessages() {
+ return maxPendingMessages;
+ }
+
+ /** The maximum allowed recommitting time for a Pulsar transaction. */
+ public int getMaxRecommitTimes() {
+ return maxRecommitTimes;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ SinkConfiguration that = (SinkConfiguration) o;
+ return transactionTimeoutMillis == that.transactionTimeoutMillis
+ && topicMetadataRefreshInterval == that.topicMetadataRefreshInterval
+ && partitionSwitchSize == that.partitionSwitchSize
+ && enableSchemaEvolution == that.enableSchemaEvolution
+ && maxPendingMessages == that.maxPendingMessages
+ && maxRecommitTimes == that.maxRecommitTimes;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ super.hashCode(),
+ transactionTimeoutMillis,
+ topicMetadataRefreshInterval,
+ partitionSwitchSize,
+ enableSchemaEvolution,
+ maxPendingMessages,
+ maxRecommitTimes);
+ }
+}