You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/02 16:41:14 UTC
[pulsar] branch master updated: [Issue-2122] [pulsar-client] Adding
configuration for backoff strategy (#3848)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bdfc098 [Issue-2122] [pulsar-client] Adding configuration for backoff strategy (#3848)
bdfc098 is described below
commit bdfc09867325a271dc40a3aa87dcefb905769f06
Author: Richard Yu <yo...@gmail.com>
AuthorDate: Tue Apr 2 09:41:04 2019 -0700
[Issue-2122] [pulsar-client] Adding configuration for backoff strategy (#3848)
Fixes #2122
### Motivation
Current backoff strategy is set by default and is too aggressive. What we should do is allow it to be configurable by the user.
### Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (not sure)
---
.../apache/pulsar/client/impl/RawReaderImpl.java | 4 +-
.../apache/pulsar/client/api/ClientBuilder.java | 22 ++++++
.../org/apache/pulsar/client/impl/Backoff.java | 54 ++++++++++++---
.../apache/pulsar/client/impl/BackoffBuilder.java | 81 ++++++++++++++++++++++
.../client/impl/BinaryProtoLookupService.java | 10 ++-
.../pulsar/client/impl/ClientBuilderImpl.java | 12 ++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 46 +++++++++---
.../client/impl/MultiTopicsConsumerImpl.java | 11 ++-
.../apache/pulsar/client/impl/ProducerImpl.java | 9 ++-
.../pulsar/client/impl/PulsarClientImpl.java | 3 +-
.../org/apache/pulsar/client/impl/ReaderImpl.java | 4 +-
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 10 ++-
.../client/impl/conf/ClientConfigurationData.java | 3 +
.../pulsar/client/impl/ConsumerImplTest.java | 16 ++++-
.../java/org/apache/pulsar/storm/PulsarSpout.java | 5 +-
15 files changed, 257 insertions(+), 33 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index fa527c1..d1619f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -111,7 +111,9 @@ public class RawReaderImpl implements RawReader {
consumerFuture,
SubscriptionMode.Durable,
MessageId.earliest,
- Schema.BYTES, null);
+ Schema.BYTES, null,
+ client.getConfiguration().getDefaultBackoffIntervalNanos(),
+ client.getConfiguration().getMaxBackoffIntervalNanos());
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 9d30aba..a99d08b 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -354,4 +354,26 @@ public interface ClientBuilder extends Cloneable {
* @return the client builder instance
*/
ClientBuilder connectionTimeout(int duration, TimeUnit unit);
+
+ /**
+ * Set the duration of time for a backoff interval.
+ *
+ * @param duration
+ * the duration of the interval
+ * @param unit
+ * the time unit in which the duration is defined
+ * @return the client builder instance
+ */
+ ClientBuilder startingBackoffInterval(long duration, TimeUnit unit);
+
+ /**
+ * Set the maximum duration of time for a backoff interval.
+ *
+ * @param duration
+ * the duration of the interval
+ * @param unit
+ * the time unit in which the duration is defined
+ * @return the client builder instance
+ */
+ ClientBuilder maxBackoffInterval(long duration, TimeUnit unit);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
index aa506da..1cb2e1e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
@@ -25,8 +25,10 @@ import java.util.concurrent.TimeUnit;
// All variables are in TimeUnit millis by default
public class Backoff {
- private static final long DEFAULT_INTERVAL_IN_NANOSECONDS = TimeUnit.MILLISECONDS.toNanos(100);
- private static final long MAX_BACKOFF_INTERVAL_NANOSECONDS = TimeUnit.SECONDS.toNanos(30);
+ public static final long DEFAULT_INTERVAL_IN_NANOSECONDS = TimeUnit.MILLISECONDS.toNanos(100);
+ public static final long MAX_BACKOFF_INTERVAL_NANOSECONDS = TimeUnit.SECONDS.toNanos(30);
+ private final long backoffIntervalNanos;
+ private final long maxBackoffIntervalNanos;
private final long initial;
private final long max;
private final Clock clock;
@@ -40,19 +42,33 @@ public class Backoff {
@VisibleForTesting
Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
- TimeUnit unitMandatoryStop, Clock clock) {
+ TimeUnit unitMandatoryStop, Clock clock, long backoffIntervalNanos, long maxBackoffIntervalNanos) {
this.initial = unitInitial.toMillis(initial);
this.max = unitMax.toMillis(max);
this.next = this.initial;
this.mandatoryStop = unitMandatoryStop.toMillis(mandatoryStop);
this.clock = clock;
+ this.backoffIntervalNanos = backoffIntervalNanos;
+ this.maxBackoffIntervalNanos = maxBackoffIntervalNanos;
}
+ @VisibleForTesting
+ Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
+ TimeUnit unitMandatoryStop, Clock clock) {
+ this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock,
+ Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
+ }
public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
- TimeUnit unitMandatoryStop) {
+ TimeUnit unitMandatoryStop) {
this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, Clock.systemDefaultZone());
}
+ public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
+ TimeUnit unitMandatoryStop, long backoffIntervalMs, long maxBackoffIntervalMs) {
+ this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, Clock.systemDefaultZone(),
+ backoffIntervalMs, maxBackoffIntervalMs);
+ }
+
public long next() {
long current = this.next;
if (current < max) {
@@ -99,14 +115,25 @@ public class Backoff {
return firstBackoffTimeInMillis;
}
- public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) {
- long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
+ @VisibleForTesting
+ long backoffIntervalNanos() {
+ return backoffIntervalNanos;
+ }
+
+ @VisibleForTesting
+ long maxBackoffIntervalNanos() {
+ return maxBackoffIntervalNanos;
+ }
+
+ public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts,
+ long defaultInterval, long maxBackoffInterval) {
+ long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
long currentTime = System.nanoTime();
- long interval = DEFAULT_INTERVAL_IN_NANOSECONDS;
+ long interval = defaultInterval;
for (int i = 1; i < failedAttempts; i++) {
interval = interval * 2;
- if (interval > MAX_BACKOFF_INTERVAL_NANOSECONDS) {
- interval = MAX_BACKOFF_INTERVAL_NANOSECONDS;
+ if (interval > maxBackoffInterval) {
+ interval = maxBackoffInterval;
break;
}
}
@@ -114,4 +141,13 @@ public class Backoff {
// if the current time is less than the time at which next retry should occur, we should backoff
return currentTime < (initialTimestampInNano + interval);
}
+
+ public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) {
+ return Backoff.shouldBackoff(initialTimestamp, unitInitial, failedAttempts,
+ DEFAULT_INTERVAL_IN_NANOSECONDS, MAX_BACKOFF_INTERVAL_NANOSECONDS);
+ }
+
+ public boolean instanceShouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) {
+ return Backoff.shouldBackoff(initialTimestamp, unitInitial, failedAttempts, backoffIntervalNanos, maxBackoffIntervalNanos);
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
new file mode 100644
index 0000000..aa997d7
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.time.Clock;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class BackoffBuilder {
+ private long backoffIntervalNanos;
+ private long maxBackoffIntervalNanos;
+ private long initial;
+ private TimeUnit unitInitial;
+ private long max;
+ private TimeUnit unitMax;
+ private Clock clock;
+ private long mandatoryStop;
+ private TimeUnit unitMandatoryStop;
+
+ @VisibleForTesting
+ BackoffBuilder() {
+ this.initial = 0;
+ this.max = 0;
+ this.mandatoryStop = 0;
+ this.clock = Clock.systemDefaultZone();
+ this.backoffIntervalNanos = 0;
+ this.maxBackoffIntervalNanos = 0;
+ }
+
+ public BackoffBuilder setInitialTime(long initial, TimeUnit unitInitial) {
+ this.unitInitial = unitInitial;
+ this.initial = initial;
+ return this;
+ }
+
+ public BackoffBuilder setMax(long max, TimeUnit unitMax) {
+ this.unitMax = unitMax;
+ this.max = max;
+ return this;
+ }
+
+ public BackoffBuilder setMandatoryStop(long mandatoryStop, TimeUnit unitMandatoryStop) {
+ this.mandatoryStop = mandatoryStop;
+ this.unitMandatoryStop = unitMandatoryStop;
+ return this;
+ }
+
+ public BackoffBuilder useDefaultBackoffIntervals() {
+ return useUserConfiguredIntervals(Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS,
+ Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS );
+ }
+
+ public BackoffBuilder useUserConfiguredIntervals(long backoffIntervalNanos,
+ long maxBackoffIntervalNanos) {
+ this.backoffIntervalNanos = backoffIntervalNanos;
+ this.maxBackoffIntervalNanos = maxBackoffIntervalNanos;
+ return this;
+ }
+
+ public Backoff create() {
+ return new Backoff(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock,
+ backoffIntervalNanos, maxBackoffIntervalNanos);
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 8d07552..0e39f39 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -207,9 +207,13 @@ public class BinaryProtoLookupService implements LookupService {
CompletableFuture<List<String>> topicsFuture = new CompletableFuture<List<String>>();
AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
- Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
- opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS,
- 0 , TimeUnit.MILLISECONDS);
+ Backoff backoff = new BackoffBuilder()
+ .setInitialTime(100, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+ .setMax(0, TimeUnit.MILLISECONDS)
+ .useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(),
+ client.getConfiguration().getMaxBackoffIntervalNanos())
+ .create();
getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace, backoff, opTimeoutMs, topicsFuture, mode);
return topicsFuture;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 5e00a30..a44d286 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -208,6 +208,18 @@ public class ClientBuilderImpl implements ClientBuilder {
return this;
}
+ @Override
+ public ClientBuilder startingBackoffInterval(long duration, TimeUnit unit) {
+ conf.setDefaultBackoffIntervalNanos(unit.toNanos(duration));
+ return this;
+ }
+
+ @Override
+ public ClientBuilder maxBackoffInterval(long duration, TimeUnit unit) {
+ conf.setMaxBackoffIntervalNanos(unit.toNanos(duration));
+ return this;
+ }
+
public ClientConfigurationData getClientConfigurationData() {
return conf;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index fc0c660..6dc96d7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -138,6 +138,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private Producer<T> deadLetterProducer;
+ private final long backoffIntervalNanos;
+ private final long maxBackoffIntervalNanos;
+
protected volatile boolean paused;
enum SubscriptionMode {
@@ -152,18 +155,27 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
- if (conf.getReceiverQueueSize() == 0) {
+ return ConsumerImpl.newConsumerImpl(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode,
+ startMessageId, schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
+ }
+
+ static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
+ ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
+ SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
+ long backoffIntervalNanos, long maxBackoffIntervalNanos) {
+ if (conf.getReceiverQueueSize() == 0) {
return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture,
- subscriptionMode, startMessageId, schema, interceptors);
+ subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
} else {
return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture,
- subscriptionMode, startMessageId, schema, interceptors);
+ subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
}
}
-
+
protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
- SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
+ SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
+ long backoffIntervalNanos, long maxBackoffIntervalNanos) {
super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors);
this.consumerId = client.newConsumerId();
this.subscriptionMode = subscriptionMode;
@@ -208,8 +220,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
this.connectionHandler = new ConnectionHandler(this,
- new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS),
- this);
+ new BackoffBuilder()
+ .setInitialTime(100, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(60, TimeUnit.SECONDS)
+ .setMax(0, TimeUnit.MILLISECONDS)
+ .useUserConfiguredIntervals(backoffIntervalNanos,
+ maxBackoffIntervalNanos)
+ .create(),
+ this);
this.topicName = TopicName.get(topic);
if (this.topicName.isPersistent()) {
@@ -238,6 +256,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
possibleSendToDeadLetterTopicMessages = null;
}
+ this.backoffIntervalNanos = backoffIntervalNanos;
+ this.maxBackoffIntervalNanos = maxBackoffIntervalNanos;
+
topicNameWithoutPartition = topicName.getPartitionedTopicName();
grabCnx();
@@ -1448,9 +1469,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
- Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
- opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS,
- 0 , TimeUnit.MILLISECONDS);
+ Backoff backoff = new BackoffBuilder()
+ .setInitialTime(100, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+ .setMax(0, TimeUnit.MILLISECONDS)
+ .useUserConfiguredIntervals(backoffIntervalNanos,
+ maxBackoffIntervalNanos)
+ .create();
+
CompletableFuture<MessageId> getLastMessageIdFuture = new CompletableFuture<>();
internalGetLastMessageIdAsync(backoff, opTimeoutMs, getLastMessageIdFuture);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 71af9c7..69dd385 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -749,7 +749,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider().getExecutor(),
partitionIndex, subFuture,
- SubscriptionMode.Durable, null, schema, interceptors);
+ SubscriptionMode.Durable, null, schema, interceptors,
+ client.getConfiguration().getDefaultBackoffIntervalNanos(),
+ client.getConfiguration().getMaxBackoffIntervalNanos());
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
return subFuture;
})
@@ -761,7 +763,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
client.externalExecutorProvider().getExecutor(), 0, subFuture, SubscriptionMode.Durable, null,
- schema, interceptors);
+ schema, interceptors, client.getConfiguration().getDefaultBackoffIntervalNanos(),
+ client.getConfiguration().getMaxBackoffIntervalNanos());
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
futureList = Collections.singletonList(subFuture);
@@ -977,7 +980,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
client, partitionName, configurationData,
client.externalExecutorProvider().getExecutor(),
- partitionIndex, subFuture, SubscriptionMode.Durable, null, schema, interceptors);
+ partitionIndex, subFuture, SubscriptionMode.Durable, null, schema, interceptors,
+ client.getConfiguration().getDefaultBackoffIntervalNanos(),
+ client.getConfiguration().getMaxBackoffIntervalNanos());
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
if (log.isDebugEnabled()) {
log.debug("[{}] create consumer {} for partitionName: {}",
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 0479539..c80e529 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -183,8 +183,15 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
this.connectionHandler = new ConnectionHandler(this,
- new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS),
+ new BackoffBuilder()
+ .setInitialTime(100, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(60, TimeUnit.SECONDS)
+ .setMax(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)
+ .useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(),
+ client.getConfiguration().getMaxBackoffIntervalNanos())
+ .create(),
this);
+
grabCnx();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 2e92c50..bdd3345 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -334,7 +334,8 @@ public class PulsarClientImpl implements PulsarClient {
listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors);
} else {
consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, -1,
- consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors);
+ consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
+ this.conf.getDefaultBackoffIntervalNanos(), this.conf.getMaxBackoffIntervalNanos());
}
synchronized (consumers) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 388e5c4..08ca1ed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -85,7 +85,9 @@ public class ReaderImpl<T> implements Reader<T> {
final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
- partitionIdx, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null);
+ partitionIdx, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null,
+ client.getConfiguration().getDefaultBackoffIntervalNanos(), client.getConfiguration().getMaxBackoffIntervalNanos());
+
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 09b610f..b0dc4b3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -50,8 +50,16 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
ConsumerInterceptors<T> interceptors) {
+ this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, startMessageId,
+ schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
+ }
+
+ public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
+ ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
+ SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
+ ConsumerInterceptors<T> interceptors, long backoffIntervalNanos, long maxBackoffIntervalNanos) {
super(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, startMessageId,
- schema, interceptors);
+ schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index f7bec63..db77bd1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl.conf;
import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ServiceUrlProvider;
@@ -59,6 +60,8 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private int maxNumberOfRejectedRequestPerConnection = 50;
private int keepAliveIntervalSeconds = 30;
private int connectionTimeoutMs = 10000;
+ private long defaultBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
+ private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(30);
public ClientConfigurationData clone() {
try {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index e09184c..4e6bad6 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -32,11 +32,15 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.*;
public class ConsumerImplTest {
+ private static final long DEFAULT_BACKOFF_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1);
+ private static final long MAX_BACKOFF_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(20);
+
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private ConsumerImpl<ConsumerImpl> consumer;
private ConsumerConfigurationData consumerConf;
@@ -54,11 +58,21 @@ public class ConsumerImplTest {
when(client.getConnection(anyString())).thenReturn(clientCnxFuture);
clientConf.setOperationTimeoutMs(100);
clientConf.setStatsIntervalSeconds(0);
+ clientConf.setDefaultBackoffIntervalNanos(DEFAULT_BACKOFF_INTERVAL_NANOS);
+ clientConf.setMaxBackoffIntervalNanos(MAX_BACKOFF_INTERVAL_NANOS);
when(client.getConfiguration()).thenReturn(clientConf);
consumerConf.setSubscriptionName("test-sub");
consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
- executorService, -1, subscribeFuture, SubscriptionMode.Durable, null, null, null);
+ executorService, -1, subscribeFuture, SubscriptionMode.Durable, null, null, null,
+ clientConf.getDefaultBackoffIntervalNanos(), clientConf.getMaxBackoffIntervalNanos());
+ }
+
+ @Test(invocationTimeOut = 500)
+ public void testCorrectBackoffConfiguration() {
+ final Backoff backoff = consumer.getConnectionHandler().backoff;
+ Assert.assertEquals(backoff.backoffIntervalNanos(), DEFAULT_BACKOFF_INTERVAL_NANOS);
+ Assert.assertEquals(backoff.maxBackoffIntervalNanos(), MAX_BACKOFF_INTERVAL_NANOS);
}
@Test(invocationTimeOut = 1000)
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 713007b..8907fc3 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -179,8 +179,9 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
if (msg != null) {
MessageRetries messageRetries = pendingMessageRetries.get(msg.getMessageId());
if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS,
- messageRetries.getNumRetries())) {
- Utils.sleep(100);
+ messageRetries.getNumRetries(), clientConf.getDefaultBackoffIntervalNanos(),
+ clientConf.getMaxBackoffIntervalNanos())) {
+ Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getDefaultBackoffIntervalNanos()));
} else {
// remove the message from the queue and emit to the topology, only if it should not be backedoff
LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId());