You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:07 UTC

[pulsar] 21/38: Make messageReceiveTimeoutMs in the PulsarConsumerSource configurable (#6783)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8783907da1ec91e10fcd6b8dc7d5d928e2778498
Author: luceneReader <54...@qq.com>
AuthorDate: Sat Apr 25 14:47:56 2020 +0800

    Make messageReceiveTimeoutMs in the PulsarConsumerSource configurable (#6783)
    
    The messageReceiveTimeoutMs value in the PulsarConsumerSource class is hardcoded to 100ms and cannot be altered through configuration at present.
    (cherry picked from commit 47b4dd071a7e4f6d31f0d6f9abb1f182260820d8)
---
 .../connectors/pulsar/PulsarConsumerSource.java          |  3 ++-
 .../streaming/connectors/pulsar/PulsarSourceBuilder.java | 16 ++++++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 9606dfc..e137cae 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -54,7 +54,7 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
 
     private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class);
 
-    private final int messageReceiveTimeoutMs = 100;
+    private int messageReceiveTimeoutMs;
 
     private ClientConfigurationData clientConfigurationData;
     private ConsumerConfigurationData<byte[]> consumerConfigurationData;
@@ -81,6 +81,7 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
         this.consumerConfigurationData = builder.consumerConfigurationData;
         this.deserializer = builder.deserializationSchema;
         this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
+        this.messageReceiveTimeoutMs = builder.messageReceiveTimeoutMs;
     }
 
     @Override
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 15dc6e4..67690be 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -44,6 +44,7 @@ public class PulsarSourceBuilder<T> {
     private static final String SERVICE_URL = "pulsar://localhost:6650";
     private static final long ACKNOWLEDGEMENT_BATCH_SIZE = 100;
     private static final long MAX_ACKNOWLEDGEMENT_BATCH_SIZE = 1000;
+    private static final int DEFAULT_MESSAGE_RECEIVE_TIMEOUT_MS = 100;
     private static final String SUBSCRIPTION_NAME = "flink-sub";
 
     final DeserializationSchema<T> deserializationSchema;
@@ -52,6 +53,8 @@ public class PulsarSourceBuilder<T> {
     ConsumerConfigurationData<byte[]> consumerConfigurationData;
 
     long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
+    //
+    int messageReceiveTimeoutMs = DEFAULT_MESSAGE_RECEIVE_TIMEOUT_MS;
 
     private PulsarSourceBuilder(DeserializationSchema<T> deserializationSchema) {
         this.deserializationSchema = deserializationSchema;
@@ -193,6 +196,19 @@ public class PulsarSourceBuilder<T> {
     }
 
     /**
+     * parameterize messageReceiveTimeoutMs for `PulsarConsumerSource`.
+     * @param timeout timeout in ms, should be gt 0
+     * @return this builder
+     */
+    public PulsarSourceBuilder<T> messageReceiveTimeoutMs(int timeout) {
+        if (timeout <= 0) {
+            throw new IllegalArgumentException("messageReceiveTimeoutMs can only take values > 0");
+        }
+        this.messageReceiveTimeoutMs = timeout;
+        return this;
+    }
+
+    /**
      * Set the authentication provider to use in the Pulsar client instance.
      *
      * @param authentication an instance of the {@link Authentication} provider already constructed