You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/12/06 16:38:09 UTC
[flink-connector-pulsar] 08/16: [FLINK-28820][Connector/Pulsar] Improve the writing performance for PulsarSink (#21074)
This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
commit c6a20a66bf22383c114a6ae23f70413d6736aa51
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Oct 18 21:32:37 2022 +0800
[FLINK-28820][Connector/Pulsar] Improve the writing performance for PulsarSink (#21074)
---
.../connector/pulsar/sink/PulsarSinkOptions.java | 2 +
.../pulsar/sink/config/PulsarSinkConfigUtils.java | 4 --
.../pulsar/sink/config/SinkConfiguration.java | 13 ----
.../connector/pulsar/sink/writer/PulsarWriter.java | 82 +++++++++-------------
4 files changed, 35 insertions(+), 66 deletions(-)
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
index dafb8aa..0433bb0 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
@@ -127,6 +127,8 @@ public final class PulsarSinkOptions {
"The allowed transaction recommit times if we meet some retryable exception."
+ " This is used in Pulsar Transaction.");
+ /** @deprecated This config option was removed for better performance. */
+ @Deprecated
public static final ConfigOption<Integer> PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM =
ConfigOptions.key(SINK_CONFIG_PREFIX + "maxPendingMessages")
.intType()
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
index 5f935a4..61cfd5a 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
@@ -84,10 +84,6 @@ public final class PulsarSinkConfigUtils {
PULSAR_SEND_TIMEOUT_MS,
Math::toIntExact,
ms -> builder.sendTimeout(ms, MILLISECONDS));
- configuration.useOption(PULSAR_MAX_PENDING_MESSAGES, builder::maxPendingMessages);
- configuration.useOption(
- PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS,
- builder::maxPendingMessagesAcrossPartitions);
configuration.useOption(
PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS,
s -> builder.batchingMaxPublishDelay(s, MICROSECONDS));
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
index 768b730..d6a6ee3 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
@@ -32,7 +32,6 @@ import org.apache.pulsar.client.api.Schema;
import java.util.Objects;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
-import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_RECOMMIT_TIMES;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL;
@@ -51,7 +50,6 @@ public class SinkConfiguration extends PulsarConfiguration {
private final int partitionSwitchSize;
private final MessageKeyHash messageKeyHash;
private final boolean enableSchemaEvolution;
- private final int maxPendingMessages;
private final int maxRecommitTimes;
public SinkConfiguration(Configuration configuration) {
@@ -63,7 +61,6 @@ public class SinkConfiguration extends PulsarConfiguration {
this.partitionSwitchSize = getInteger(PULSAR_BATCHING_MAX_MESSAGES);
this.messageKeyHash = get(PULSAR_MESSAGE_KEY_HASH);
this.enableSchemaEvolution = get(PULSAR_WRITE_SCHEMA_EVOLUTION);
- this.maxPendingMessages = get(PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM);
this.maxRecommitTimes = get(PULSAR_MAX_RECOMMIT_TIMES);
}
@@ -111,14 +108,6 @@ public class SinkConfiguration extends PulsarConfiguration {
return enableSchemaEvolution;
}
- /**
- * Pulsar message is sent asynchronously. Set this option for limiting the pending messages in a
- * Pulsar writer instance.
- */
- public int getMaxPendingMessages() {
- return maxPendingMessages;
- }
-
/** The maximum allowed recommitting time for a Pulsar transaction. */
public int getMaxRecommitTimes() {
return maxRecommitTimes;
@@ -141,7 +130,6 @@ public class SinkConfiguration extends PulsarConfiguration {
&& partitionSwitchSize == that.partitionSwitchSize
&& enableSchemaEvolution == that.enableSchemaEvolution
&& messageKeyHash == that.messageKeyHash
- && maxPendingMessages == that.maxPendingMessages
&& maxRecommitTimes == that.maxRecommitTimes;
}
@@ -154,7 +142,6 @@ public class SinkConfiguration extends PulsarConfiguration {
partitionSwitchSize,
messageKeyHash,
enableSchemaEvolution,
- maxPendingMessages,
maxRecommitTimes);
}
}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
index 927e40c..e30b593 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
@@ -48,7 +48,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.emptyList;
import static org.apache.flink.util.IOUtils.closeAll;
@@ -64,17 +64,15 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommittable> {
private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class);
- private final SinkConfiguration sinkConfiguration;
private final PulsarSerializationSchema<IN> serializationSchema;
private final TopicMetadataListener metadataListener;
private final TopicRouter<IN> topicRouter;
private final MessageDelayer<IN> messageDelayer;
private final DeliveryGuarantee deliveryGuarantee;
private final PulsarSinkContext sinkContext;
- private final MailboxExecutor mailboxExecutor;
private final TopicProducerRegister producerRegister;
-
- private long pendingMessages = 0;
+ private final MailboxExecutor mailboxExecutor;
+ private final AtomicLong pendingMessages = new AtomicLong(0);
/**
* Constructor creating a Pulsar writer.
@@ -96,7 +94,7 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
TopicRouter<IN> topicRouter,
MessageDelayer<IN> messageDelayer,
InitContext initContext) {
- this.sinkConfiguration = checkNotNull(sinkConfiguration);
+ checkNotNull(sinkConfiguration);
this.serializationSchema = checkNotNull(serializationSchema);
this.metadataListener = checkNotNull(metadataListener);
this.topicRouter = checkNotNull(topicRouter);
@@ -105,7 +103,6 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
this.sinkContext = new PulsarSinkContextImpl(initContext, sinkConfiguration);
- this.mailboxExecutor = initContext.getMailboxExecutor();
// Initialize topic metadata listener.
LOG.debug("Initialize topic metadata after creating Pulsar writer.");
@@ -126,6 +123,7 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
// Create this producer register after opening serialization schema!
this.producerRegister = new TopicProducerRegister(sinkConfiguration);
+ this.mailboxExecutor = initContext.getMailboxExecutor();
}
@Override
@@ -151,42 +149,28 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
// We would just ignore the sending exception. This may cause data loss.
builder.sendAsync();
} else {
- // Waiting for permits to write message.
- requirePermits();
- mailboxExecutor.execute(
- () -> enqueueMessageSending(topic, builder),
- "Failed to send message to Pulsar");
- }
- }
-
- private void enqueueMessageSending(String topic, TypedMessageBuilder<?> builder)
- throws ExecutionException, InterruptedException {
- // Block the mailbox executor for yield method.
- builder.sendAsync()
- .whenComplete(
- (id, ex) -> {
- this.releasePermits();
- if (ex != null) {
- throw new FlinkRuntimeException(
- "Failed to send data to Pulsar " + topic, ex);
- } else {
- LOG.debug(
- "Sent message to Pulsar {} with message id {}", topic, id);
- }
- })
- .get();
- }
-
- private void requirePermits() throws InterruptedException {
- while (pendingMessages >= sinkConfiguration.getMaxPendingMessages()) {
- LOG.info("Waiting for the available permits.");
- mailboxExecutor.yield();
+ // Increase the pending message count.
+ pendingMessages.incrementAndGet();
+ builder.sendAsync()
+ .whenComplete(
+ (id, ex) -> {
+ pendingMessages.decrementAndGet();
+ if (ex != null) {
+ mailboxExecutor.execute(
+ () -> {
+ throw new FlinkRuntimeException(
+ "Failed to send data to Pulsar " + topic,
+ ex);
+ },
+ "Failed to send data to Pulsar");
+ } else {
+ LOG.debug(
+ "Sent message to Pulsar {} with message id {}",
+ topic,
+ id);
+ }
+ });
}
- pendingMessages++;
- }
-
- private void releasePermits() {
- this.pendingMessages -= 1;
}
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -244,15 +228,15 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
}
@Override
- public void flush(boolean endOfInput) throws IOException, InterruptedException {
- if (endOfInput) {
- // Try flush only once when we meet the end of the input.
+ public void flush(boolean endOfInput) throws IOException {
+ if (endOfInput || deliveryGuarantee != DeliveryGuarantee.NONE) {
+ LOG.info("Flush the pending messages to Pulsar.");
+
+ // Try to flush pending messages.
producerRegister.flush();
- } else {
- while (pendingMessages != 0 && deliveryGuarantee != DeliveryGuarantee.NONE) {
+ // Make sure all the pending messages should be flushed to Pulsar.
+ while (pendingMessages.longValue() > 0) {
producerRegister.flush();
- LOG.info("Flush the pending messages to Pulsar.");
- mailboxExecutor.yield();
}
}
}