You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:07 UTC
[pulsar] 21/38: Make messageReceiveTimeoutMs in the
PulsarConsumerSource configurable (#6783)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8783907da1ec91e10fcd6b8dc7d5d928e2778498
Author: luceneReader <54...@qq.com>
AuthorDate: Sat Apr 25 14:47:56 2020 +0800
Make messageReceiveTimeoutMs in the PulsarConsumerSource configurable (#6783)
The messageReceiveTimeoutMs value in the PulsarConsumerSource class is hardcoded to 100ms and cannot be altered through configuration at present.
(cherry picked from commit 47b4dd071a7e4f6d31f0d6f9abb1f182260820d8)
---
.../connectors/pulsar/PulsarConsumerSource.java | 3 ++-
.../streaming/connectors/pulsar/PulsarSourceBuilder.java | 16 ++++++++++++++++
2 files changed, 18 insertions(+), 1 deletion(-)
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 9606dfc..e137cae 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -54,7 +54,7 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class);
- private final int messageReceiveTimeoutMs = 100;
+ private int messageReceiveTimeoutMs;
private ClientConfigurationData clientConfigurationData;
private ConsumerConfigurationData<byte[]> consumerConfigurationData;
@@ -81,6 +81,7 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
this.consumerConfigurationData = builder.consumerConfigurationData;
this.deserializer = builder.deserializationSchema;
this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
+ this.messageReceiveTimeoutMs = builder.messageReceiveTimeoutMs;
}
@Override
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 15dc6e4..67690be 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -44,6 +44,7 @@ public class PulsarSourceBuilder<T> {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final long ACKNOWLEDGEMENT_BATCH_SIZE = 100;
private static final long MAX_ACKNOWLEDGEMENT_BATCH_SIZE = 1000;
+ private static final int DEFAULT_MESSAGE_RECEIVE_TIMEOUT_MS = 100;
private static final String SUBSCRIPTION_NAME = "flink-sub";
final DeserializationSchema<T> deserializationSchema;
@@ -52,6 +53,8 @@ public class PulsarSourceBuilder<T> {
ConsumerConfigurationData<byte[]> consumerConfigurationData;
long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
+ //
+ int messageReceiveTimeoutMs = DEFAULT_MESSAGE_RECEIVE_TIMEOUT_MS;
private PulsarSourceBuilder(DeserializationSchema<T> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
@@ -193,6 +196,19 @@ public class PulsarSourceBuilder<T> {
}
/**
+ * parameterize messageReceiveTimeoutMs for `PulsarConsumerSource`.
+ * @param timeout timeout in ms, should be gt 0
+ * @return this builder
+ */
+ public PulsarSourceBuilder<T> messageReceiveTimeoutMs(int timeout) {
+ if (timeout <= 0) {
+ throw new IllegalArgumentException("messageReceiveTimeoutMs can only take values > 0");
+ }
+ this.messageReceiveTimeoutMs = timeout;
+ return this;
+ }
+
+ /**
* Set the authentication provider to use in the Pulsar client instance.
*
* @param authentication an instance of the {@link Authentication} provider already constructed