You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/12/06 16:38:09 UTC

[flink-connector-pulsar] 08/16: [FLINK-28820][Connector/Pulsar] Improve the writing performance for PulsarSink (#21074)

This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit c6a20a66bf22383c114a6ae23f70413d6736aa51
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Oct 18 21:32:37 2022 +0800

    [FLINK-28820][Connector/Pulsar] Improve the writing performance for PulsarSink (#21074)
---
 .../connector/pulsar/sink/PulsarSinkOptions.java   |  2 +
 .../pulsar/sink/config/PulsarSinkConfigUtils.java  |  4 --
 .../pulsar/sink/config/SinkConfiguration.java      | 13 ----
 .../connector/pulsar/sink/writer/PulsarWriter.java | 82 +++++++++-------------
 4 files changed, 35 insertions(+), 66 deletions(-)

diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
index dafb8aa..0433bb0 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
@@ -127,6 +127,8 @@ public final class PulsarSinkOptions {
                             "The allowed transaction recommit times if we meet some retryable exception."
                                     + " This is used in Pulsar Transaction.");
 
+    /** @deprecated This config option was removed for better performance. */
+    @Deprecated
     public static final ConfigOption<Integer> PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM =
             ConfigOptions.key(SINK_CONFIG_PREFIX + "maxPendingMessages")
                     .intType()
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
index 5f935a4..61cfd5a 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
@@ -84,10 +84,6 @@ public final class PulsarSinkConfigUtils {
                 PULSAR_SEND_TIMEOUT_MS,
                 Math::toIntExact,
                 ms -> builder.sendTimeout(ms, MILLISECONDS));
-        configuration.useOption(PULSAR_MAX_PENDING_MESSAGES, builder::maxPendingMessages);
-        configuration.useOption(
-                PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS,
-                builder::maxPendingMessagesAcrossPartitions);
         configuration.useOption(
                 PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS,
                 s -> builder.batchingMaxPublishDelay(s, MICROSECONDS));
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
index 768b730..d6a6ee3 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
@@ -32,7 +32,6 @@ import org.apache.pulsar.client.api.Schema;
 import java.util.Objects;
 
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
-import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_RECOMMIT_TIMES;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL;
@@ -51,7 +50,6 @@ public class SinkConfiguration extends PulsarConfiguration {
     private final int partitionSwitchSize;
     private final MessageKeyHash messageKeyHash;
     private final boolean enableSchemaEvolution;
-    private final int maxPendingMessages;
     private final int maxRecommitTimes;
 
     public SinkConfiguration(Configuration configuration) {
@@ -63,7 +61,6 @@ public class SinkConfiguration extends PulsarConfiguration {
         this.partitionSwitchSize = getInteger(PULSAR_BATCHING_MAX_MESSAGES);
         this.messageKeyHash = get(PULSAR_MESSAGE_KEY_HASH);
         this.enableSchemaEvolution = get(PULSAR_WRITE_SCHEMA_EVOLUTION);
-        this.maxPendingMessages = get(PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM);
         this.maxRecommitTimes = get(PULSAR_MAX_RECOMMIT_TIMES);
     }
 
@@ -111,14 +108,6 @@ public class SinkConfiguration extends PulsarConfiguration {
         return enableSchemaEvolution;
     }
 
-    /**
-     * Pulsar message is sent asynchronously. Set this option for limiting the pending messages in a
-     * Pulsar writer instance.
-     */
-    public int getMaxPendingMessages() {
-        return maxPendingMessages;
-    }
-
     /** The maximum allowed recommitting time for a Pulsar transaction. */
     public int getMaxRecommitTimes() {
         return maxRecommitTimes;
@@ -141,7 +130,6 @@ public class SinkConfiguration extends PulsarConfiguration {
                 && partitionSwitchSize == that.partitionSwitchSize
                 && enableSchemaEvolution == that.enableSchemaEvolution
                 && messageKeyHash == that.messageKeyHash
-                && maxPendingMessages == that.maxPendingMessages
                 && maxRecommitTimes == that.maxRecommitTimes;
     }
 
@@ -154,7 +142,6 @@ public class SinkConfiguration extends PulsarConfiguration {
                 partitionSwitchSize,
                 messageKeyHash,
                 enableSchemaEvolution,
-                maxPendingMessages,
                 maxRecommitTimes);
     }
 }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
index 927e40c..e30b593 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
@@ -48,7 +48,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.Collections.emptyList;
 import static org.apache.flink.util.IOUtils.closeAll;
@@ -64,17 +64,15 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommittable> {
     private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class);
 
-    private final SinkConfiguration sinkConfiguration;
     private final PulsarSerializationSchema<IN> serializationSchema;
     private final TopicMetadataListener metadataListener;
     private final TopicRouter<IN> topicRouter;
     private final MessageDelayer<IN> messageDelayer;
     private final DeliveryGuarantee deliveryGuarantee;
     private final PulsarSinkContext sinkContext;
-    private final MailboxExecutor mailboxExecutor;
     private final TopicProducerRegister producerRegister;
-
-    private long pendingMessages = 0;
+    private final MailboxExecutor mailboxExecutor;
+    private final AtomicLong pendingMessages = new AtomicLong(0);
 
     /**
      * Constructor creating a Pulsar writer.
@@ -96,7 +94,7 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
             TopicRouter<IN> topicRouter,
             MessageDelayer<IN> messageDelayer,
             InitContext initContext) {
-        this.sinkConfiguration = checkNotNull(sinkConfiguration);
+        checkNotNull(sinkConfiguration);
         this.serializationSchema = checkNotNull(serializationSchema);
         this.metadataListener = checkNotNull(metadataListener);
         this.topicRouter = checkNotNull(topicRouter);
@@ -105,7 +103,6 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
 
         this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
         this.sinkContext = new PulsarSinkContextImpl(initContext, sinkConfiguration);
-        this.mailboxExecutor = initContext.getMailboxExecutor();
 
         // Initialize topic metadata listener.
         LOG.debug("Initialize topic metadata after creating Pulsar writer.");
@@ -126,6 +123,7 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
 
         // Create this producer register after opening serialization schema!
         this.producerRegister = new TopicProducerRegister(sinkConfiguration);
+        this.mailboxExecutor = initContext.getMailboxExecutor();
     }
 
     @Override
@@ -151,42 +149,28 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
             // We would just ignore the sending exception. This may cause data loss.
             builder.sendAsync();
         } else {
-            // Waiting for permits to write message.
-            requirePermits();
-            mailboxExecutor.execute(
-                    () -> enqueueMessageSending(topic, builder),
-                    "Failed to send message to Pulsar");
-        }
-    }
-
-    private void enqueueMessageSending(String topic, TypedMessageBuilder<?> builder)
-            throws ExecutionException, InterruptedException {
-        // Block the mailbox executor for yield method.
-        builder.sendAsync()
-                .whenComplete(
-                        (id, ex) -> {
-                            this.releasePermits();
-                            if (ex != null) {
-                                throw new FlinkRuntimeException(
-                                        "Failed to send data to Pulsar " + topic, ex);
-                            } else {
-                                LOG.debug(
-                                        "Sent message to Pulsar {} with message id {}", topic, id);
-                            }
-                        })
-                .get();
-    }
-
-    private void requirePermits() throws InterruptedException {
-        while (pendingMessages >= sinkConfiguration.getMaxPendingMessages()) {
-            LOG.info("Waiting for the available permits.");
-            mailboxExecutor.yield();
+            // Increase the pending message count.
+            pendingMessages.incrementAndGet();
+            builder.sendAsync()
+                    .whenComplete(
+                            (id, ex) -> {
+                                pendingMessages.decrementAndGet();
+                                if (ex != null) {
+                                    mailboxExecutor.execute(
+                                            () -> {
+                                                throw new FlinkRuntimeException(
+                                                        "Failed to send data to Pulsar " + topic,
+                                                        ex);
+                                            },
+                                            "Failed to send data to Pulsar");
+                                } else {
+                                    LOG.debug(
+                                            "Sent message to Pulsar {} with message id {}",
+                                            topic,
+                                            id);
+                                }
+                            });
         }
-        pendingMessages++;
-    }
-
-    private void releasePermits() {
-        this.pendingMessages -= 1;
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
@@ -244,15 +228,15 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
     }
 
     @Override
-    public void flush(boolean endOfInput) throws IOException, InterruptedException {
-        if (endOfInput) {
-            // Try flush only once when we meet the end of the input.
+    public void flush(boolean endOfInput) throws IOException {
+        if (endOfInput || deliveryGuarantee != DeliveryGuarantee.NONE) {
+            LOG.info("Flush the pending messages to Pulsar.");
+
+            // Try to flush pending messages.
             producerRegister.flush();
-        } else {
-            while (pendingMessages != 0 && deliveryGuarantee != DeliveryGuarantee.NONE) {
+            // Make sure all the pending messages should be flushed to Pulsar.
+            while (pendingMessages.longValue() > 0) {
                 producerRegister.flush();
-                LOG.info("Flush the pending messages to Pulsar.");
-                mailboxExecutor.yield();
             }
         }
     }