You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2023/01/10 08:40:05 UTC

[flink-connector-pulsar] branch main updated: [FLINK-28870][Connector/Pulsar] Improve the Pulsar source performance when meeting small data rates. (#15)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
     new 6991f38  [FLINK-28870][Connector/Pulsar] Improve the Pulsar source performance when meeting small data rates. (#15)
6991f38 is described below

commit 6991f383228b59cd86b874490aca3f190c59d168
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Jan 10 16:39:58 2023 +0800

    [FLINK-28870][Connector/Pulsar] Improve the Pulsar source performance when meeting small data rates. (#15)
---
 .../generated/pulsar_source_configuration.html     |  6 ++++++
 .../pulsar/source/PulsarSourceOptions.java         | 19 +++++++++++++++++
 .../pulsar/source/config/SourceConfiguration.java  | 19 +++++++++++++++--
 .../source/reader/PulsarPartitionSplitReader.java  | 24 ++++++++--------------
 .../source/enumerator/cursor/StopCursorTest.java   |  4 +++-
 .../reader/PulsarPartitionSplitReaderTest.java     |  6 ++++--
 .../source/reader/PulsarSourceReaderTest.java      |  4 +++-
 7 files changed, 61 insertions(+), 21 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/pulsar_source_configuration.html b/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
index 7682394..94c2bd8 100644
--- a/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
+++ b/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
@@ -38,6 +38,12 @@
             <td>Boolean</td>
             <td>The metrics from Pulsar Consumer are only exposed if you enable this option.You should set the <code class="highlighter-rouge">pulsar.client.statsIntervalSeconds</code> to a positive value if you enable this option.</td>
         </tr>
+        <tr>
+            <td><h5>pulsar.source.fetchOneMessageTime</h5></td>
+            <td style="word-wrap: break-word;">100</td>
+            <td>Long</td>
+            <td>The time (in ms) for fetching one message from Pulsar. If time exceed and no message returned from Pulsar. We would consider there is no record at the current topic partition and stop fetching until next switch.<br />It's not configured by default. We will use the remaining time in <code class="highlighter-rouge">pulsar.source.maxFetchTime</code> by default, which may cause a long wait in small message rates. Add this option in source builder avoiding waiting too long.</td>
+        </tr>
         <tr>
             <td><h5>pulsar.source.maxFetchRecords</h5></td>
             <td style="word-wrap: break-word;">100</td>
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index a74f80f..fa42ccf 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -114,6 +114,25 @@ public final class PulsarSourceOptions {
                                             " We would automatically commit the cursor using the given period (in ms).")
                                     .build());
 
+    public static final ConfigOption<Integer> PULSAR_FETCH_ONE_MESSAGE_TIME =
+            ConfigOptions.key(SOURCE_CONFIG_PREFIX + "fetchOneMessageTime")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The time (in ms) for fetching one message from Pulsar. If time exceed and no message returned from Pulsar.")
+                                    .text(
+                                            " We would consider there is no record at the current topic partition and stop fetching until next switch.")
+                                    .linebreak()
+                                    .text(
+                                            "It's not configured by default. We will use the remaining time in %s by default,",
+                                            code("pulsar.source.maxFetchTime"))
+                                    .text(" which may cause a long wait in small message rates.")
+                                    .text(
+                                            " Add this option in source builder avoiding waiting too long.")
+                                    .build());
+
     public static final ConfigOption<Long> PULSAR_MAX_FETCH_TIME =
             ConfigOptions.key(SOURCE_CONFIG_PREFIX + "maxFetchTime")
                     .longType()
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
index 96e886a..7fb5e10 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
@@ -39,6 +39,7 @@ import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_SOURCE_METRICS;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
@@ -56,6 +57,7 @@ public class SourceConfiguration extends PulsarConfiguration {
     private final long partitionDiscoveryIntervalMs;
     private final boolean enableAutoAcknowledgeMessage;
     private final long autoCommitCursorInterval;
+    private final int fetchOneMessageTime;
     private final Duration maxFetchTime;
     private final int maxFetchRecords;
     private final CursorVerification verifyInitialOffsets;
@@ -72,6 +74,7 @@ public class SourceConfiguration extends PulsarConfiguration {
         this.partitionDiscoveryIntervalMs = get(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS);
         this.enableAutoAcknowledgeMessage = get(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE);
         this.autoCommitCursorInterval = get(PULSAR_AUTO_COMMIT_CURSOR_INTERVAL);
+        this.fetchOneMessageTime = getOptional(PULSAR_FETCH_ONE_MESSAGE_TIME).orElse(0);
         this.maxFetchTime = get(PULSAR_MAX_FETCH_TIME, Duration::ofMillis);
         this.maxFetchRecords = get(PULSAR_MAX_FETCH_RECORDS);
         this.verifyInitialOffsets = get(PULSAR_VERIFY_INITIAL_OFFSETS);
@@ -125,6 +128,14 @@ public class SourceConfiguration extends PulsarConfiguration {
         return autoCommitCursorInterval;
     }
 
+    /**
+     * The fetch time for polling one message. We would stop polling messages and return the
+     * messages in {@link RecordsWithSplitIds} when meet this timeout and no message consumed.
+     */
+    public int getFetchOneMessageTime() {
+        return fetchOneMessageTime;
+    }
+
     /**
      * The fetch time for flink split reader polling message. We would stop polling message and
      * return the message in {@link RecordsWithSplitIds} when timeout or exceed the {@link
@@ -202,11 +213,13 @@ public class SourceConfiguration extends PulsarConfiguration {
             return false;
         }
         SourceConfiguration that = (SourceConfiguration) o;
-        return partitionDiscoveryIntervalMs == that.partitionDiscoveryIntervalMs
+        return messageQueueCapacity == that.messageQueueCapacity
+                && partitionDiscoveryIntervalMs == that.partitionDiscoveryIntervalMs
                 && enableAutoAcknowledgeMessage == that.enableAutoAcknowledgeMessage
                 && autoCommitCursorInterval == that.autoCommitCursorInterval
-                && maxFetchRecords == that.maxFetchRecords
+                && fetchOneMessageTime == that.fetchOneMessageTime
                 && Objects.equals(maxFetchTime, that.maxFetchTime)
+                && maxFetchRecords == that.maxFetchRecords
                 && verifyInitialOffsets == that.verifyInitialOffsets
                 && Objects.equals(subscriptionName, that.subscriptionName)
                 && subscriptionMode == that.subscriptionMode
@@ -219,9 +232,11 @@ public class SourceConfiguration extends PulsarConfiguration {
     public int hashCode() {
         return Objects.hash(
                 super.hashCode(),
+                messageQueueCapacity,
                 partitionDiscoveryIntervalMs,
                 enableAutoAcknowledgeMessage,
                 autoCommitCursorInterval,
+                fetchOneMessageTime,
                 maxFetchTime,
                 maxFetchRecords,
                 verifyInitialOffsets,
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
index 638e4c7..6ba3274 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
@@ -50,14 +50,12 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
@@ -121,6 +119,7 @@ public class PulsarPartitionSplitReader
     }
 
     @Override
+    @SuppressWarnings("java:S135")
     public RecordsWithSplitIds<Message<byte[]>> fetch() throws IOException {
         RecordsBySplits.Builder<Message<byte[]>> builder = new RecordsBySplits.Builder<>();
 
@@ -138,8 +137,12 @@ public class PulsarPartitionSplitReader
                 messageNum < sourceConfiguration.getMaxFetchRecords() && deadline.hasTimeLeft();
                 messageNum++) {
             try {
-                Duration timeout = deadline.timeLeftIfAny();
-                Message<byte[]> message = pollMessage(timeout);
+                int fetchTime = sourceConfiguration.getFetchOneMessageTime();
+                if (fetchTime <= 0) {
+                    fetchTime = (int) deadline.timeLeftIfAny().toMillis();
+                }
+
+                Message<byte[]> message = pulsarConsumer.receive(fetchTime, TimeUnit.MILLISECONDS);
                 if (message == null) {
                     break;
                 }
@@ -228,7 +231,7 @@ public class PulsarPartitionSplitReader
         }
 
         // Create pulsar consumer.
-        this.pulsarConsumer = createPulsarConsumer(registeredSplit);
+        this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition());
 
         LOG.info("Register split {} consumer for current reader.", registeredSplit);
     }
@@ -261,10 +264,6 @@ public class PulsarPartitionSplitReader
         }
     }
 
-    protected Message<byte[]> pollMessage(Duration timeout) throws PulsarClientException {
-        return pulsarConsumer.receive(Math.toIntExact(timeout.toMillis()), TimeUnit.MILLISECONDS);
-    }
-
     public void notifyCheckpointComplete(TopicPartition partition, MessageId offsetsToCommit) {
         if (pulsarConsumer == null) {
             this.pulsarConsumer = createPulsarConsumer(partition);
@@ -275,13 +274,8 @@ public class PulsarPartitionSplitReader
 
     // --------------------------- Helper Methods -----------------------------
 
-    /** Create a specified {@link Consumer} by the given split information. */
-    protected Consumer<byte[]> createPulsarConsumer(PulsarPartitionSplit split) {
-        return createPulsarConsumer(split.getPartition());
-    }
-
     /** Create a specified {@link Consumer} by the given topic partition. */
-    protected Consumer<byte[]> createPulsarConsumer(TopicPartition partition) {
+    private Consumer<byte[]> createPulsarConsumer(TopicPartition partition) {
         ConsumerBuilder<byte[]> consumerBuilder =
                 createConsumerBuilder(pulsarClient, schema, sourceConfiguration);
 
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
index d82aad3..9e13b15 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -39,6 +39,7 @@ import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
@@ -97,7 +98,8 @@ class StopCursorTest extends PulsarTestSuiteBase {
     private SourceConfiguration sourceConfig() {
         Configuration config = operator().config();
         config.set(PULSAR_MAX_FETCH_RECORDS, 1);
-        config.set(PULSAR_MAX_FETCH_TIME, 1000L);
+        config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000);
+        config.set(PULSAR_MAX_FETCH_TIME, 3000L);
         config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
         config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
         return new SourceConfiguration(config);
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
index 6902a7e..9d5f65b 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
@@ -48,6 +48,7 @@ import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
@@ -277,11 +278,12 @@ class PulsarPartitionSplitReaderTest extends PulsarTestSuiteBase {
                 createSourceReaderMetricGroup());
     }
 
-    /** Default source config: max message 1, fetch timeout 1s. */
+    /** Default source config: max message 1, fetch timeout 2s. */
     private SourceConfiguration sourceConfig() {
         Configuration config = operator().config();
         config.set(PULSAR_MAX_FETCH_RECORDS, 1);
-        config.set(PULSAR_MAX_FETCH_TIME, 1000L);
+        config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000);
+        config.set(PULSAR_MAX_FETCH_TIME, 3000L);
         config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
         config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
 
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
index e805d14..8a69992 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
@@ -61,6 +61,7 @@ import java.util.function.Supplier;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
@@ -229,7 +230,8 @@ class PulsarSourceReaderTest extends PulsarTestSuiteBase {
         Configuration configuration = operator().config();
 
         configuration.set(PULSAR_MAX_FETCH_RECORDS, 1);
-        configuration.set(PULSAR_MAX_FETCH_TIME, 1000L);
+        configuration.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000);
+        configuration.set(PULSAR_MAX_FETCH_TIME, 3000L);
         configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
 
         PulsarDeserializationSchema<Integer> deserializationSchema =