You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/09/24 20:48:44 UTC
[pulsar] branch master updated: Fix client backoff (#5261)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ee42cf4 Fix client backoff (#5261)
ee42cf4 is described below
commit ee42cf403349154483d66f60a2c6ae8dd8518280
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Sep 24 13:48:37 2019 -0700
Fix client backoff (#5261)
* fix client backoff
* fix tests
* cleaning up
---
.../apache/pulsar/client/impl/RawReaderImpl.java | 5 ++-
.../org/apache/pulsar/client/impl/Backoff.java | 32 +++--------------
.../apache/pulsar/client/impl/BackoffBuilder.java | 16 ++-------
.../client/impl/BinaryProtoLookupService.java | 2 --
.../pulsar/client/impl/ClientBuilderImpl.java | 2 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 42 ++++++----------------
.../client/impl/MultiTopicsConsumerImpl.java | 15 ++++----
.../apache/pulsar/client/impl/ProducerImpl.java | 6 ++--
.../pulsar/client/impl/PulsarClientImpl.java | 5 ++-
.../org/apache/pulsar/client/impl/ReaderImpl.java | 5 +--
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 2 +-
.../client/impl/conf/ClientConfigurationData.java | 4 +--
.../pulsar/client/impl/ConsumerImplTest.java | 23 ++++++------
.../java/org/apache/pulsar/storm/PulsarSpout.java | 4 +--
14 files changed, 49 insertions(+), 114 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index c807ee5..a59349d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -112,9 +112,8 @@ public class RawReaderImpl implements RawReader {
consumerFuture,
SubscriptionMode.Durable,
MessageId.earliest,
- Schema.BYTES, null,
- client.getConfiguration().getDefaultBackoffIntervalNanos(),
- client.getConfiguration().getMaxBackoffIntervalNanos());
+ Schema.BYTES, null
+ );
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
index 868c375..de25cf7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
@@ -19,16 +19,18 @@
package org.apache.pulsar.client.impl;
import com.google.common.annotations.VisibleForTesting;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
import java.time.Clock;
import java.util.Random;
import java.util.concurrent.TimeUnit;
// All variables are in TimeUnit millis by default
+@Data
public class Backoff {
public static final long DEFAULT_INTERVAL_IN_NANOSECONDS = TimeUnit.MILLISECONDS.toNanos(100);
public static final long MAX_BACKOFF_INTERVAL_NANOSECONDS = TimeUnit.SECONDS.toNanos(30);
- private final long backoffIntervalNanos;
- private final long maxBackoffIntervalNanos;
private final long initial;
private final long max;
private final Clock clock;
@@ -42,33 +44,19 @@ public class Backoff {
@VisibleForTesting
Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
- TimeUnit unitMandatoryStop, Clock clock, long backoffIntervalNanos, long maxBackoffIntervalNanos) {
+ TimeUnit unitMandatoryStop, Clock clock) {
this.initial = unitInitial.toMillis(initial);
this.max = unitMax.toMillis(max);
this.next = this.initial;
this.mandatoryStop = unitMandatoryStop.toMillis(mandatoryStop);
this.clock = clock;
- this.backoffIntervalNanos = backoffIntervalNanos;
- this.maxBackoffIntervalNanos = maxBackoffIntervalNanos;
}
- @VisibleForTesting
- Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
- TimeUnit unitMandatoryStop, Clock clock) {
- this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock,
- Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
- }
public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
TimeUnit unitMandatoryStop) {
this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, Clock.systemDefaultZone());
}
- public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
- TimeUnit unitMandatoryStop, long backoffIntervalMs, long maxBackoffIntervalMs) {
- this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, Clock.systemDefaultZone(),
- backoffIntervalMs, maxBackoffIntervalMs);
- }
-
public long next() {
long current = this.next;
if (current < max) {
@@ -115,16 +103,6 @@ public class Backoff {
return firstBackoffTimeInMillis;
}
- @VisibleForTesting
- long backoffIntervalNanos() {
- return backoffIntervalNanos;
- }
-
- @VisibleForTesting
- long maxBackoffIntervalNanos() {
- return maxBackoffIntervalNanos;
- }
-
public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts,
long defaultInterval, long maxBackoffInterval) {
long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
index aa997d7..a1c7614 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
@@ -61,21 +61,9 @@ public class BackoffBuilder {
this.unitMandatoryStop = unitMandatoryStop;
return this;
}
-
- public BackoffBuilder useDefaultBackoffIntervals() {
- return useUserConfiguredIntervals(Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS,
- Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS );
- }
-
- public BackoffBuilder useUserConfiguredIntervals(long backoffIntervalNanos,
- long maxBackoffIntervalNanos) {
- this.backoffIntervalNanos = backoffIntervalNanos;
- this.maxBackoffIntervalNanos = maxBackoffIntervalNanos;
- return this;
- }
+
public Backoff create() {
- return new Backoff(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock,
- backoffIntervalNanos, maxBackoffIntervalNanos);
+ return new Backoff(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index fae3a4f..b84a8fe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -213,8 +213,6 @@ public class BinaryProtoLookupService implements LookupService {
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMax(0, TimeUnit.MILLISECONDS)
- .useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(),
- client.getConfiguration().getMaxBackoffIntervalNanos())
.create();
getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace, backoff, opTimeoutMs, topicsFuture, mode);
return topicsFuture;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index c65b7c2..16ea688 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -211,7 +211,7 @@ public class ClientBuilderImpl implements ClientBuilder {
@Override
public ClientBuilder startingBackoffInterval(long duration, TimeUnit unit) {
- conf.setDefaultBackoffIntervalNanos(unit.toNanos(duration));
+ conf.setInitialBackoffIntervalNanos(unit.toNanos(duration));
return this;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6881b53..b13e2fa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -40,7 +40,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -140,9 +139,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private Producer<T> deadLetterProducer;
- private final long backoffIntervalNanos;
- private final long maxBackoffIntervalNanos;
-
protected volatile boolean paused;
enum SubscriptionMode {
@@ -155,29 +151,20 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
- ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
- SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
- return ConsumerImpl.newConsumerImpl(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode,
- startMessageId, schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
- }
-
- static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
- ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
- SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
- long backoffIntervalNanos, long maxBackoffIntervalNanos) {
+ ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
+ SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
if (conf.getReceiverQueueSize() == 0) {
return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
- subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
+ subscriptionMode, startMessageId, schema, interceptors);
} else {
return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
- subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
+ subscriptionMode, startMessageId, schema, interceptors);
}
}
protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
- ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
- SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
- long backoffIntervalNanos, long maxBackoffIntervalNanos) {
+ ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
+ SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors);
this.consumerId = client.newConsumerId();
this.subscriptionMode = subscriptionMode;
@@ -225,13 +212,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
this.connectionHandler = new ConnectionHandler(this,
new BackoffBuilder()
- .setInitialTime(100, TimeUnit.MILLISECONDS)
- .setMax(60, TimeUnit.SECONDS)
- .setMandatoryStop(0, TimeUnit.MILLISECONDS)
- .useUserConfiguredIntervals(backoffIntervalNanos,
- maxBackoffIntervalNanos)
- .create(),
- this);
+ .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
+ .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
+ .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+ .create(),
+ this);
this.topicName = TopicName.get(topic);
if (this.topicName.isPersistent()) {
@@ -260,9 +245,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
possibleSendToDeadLetterTopicMessages = null;
}
- this.backoffIntervalNanos = backoffIntervalNanos;
- this.maxBackoffIntervalNanos = maxBackoffIntervalNanos;
-
topicNameWithoutPartition = topicName.getPartitionedTopicName();
grabCnx();
@@ -1498,8 +1480,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
- .useUserConfiguredIntervals(backoffIntervalNanos,
- maxBackoffIntervalNanos)
.create();
CompletableFuture<MessageId> getLastMessageIdFuture = new CompletableFuture<>();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 761f39b..3986285 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Lists;
-import com.google.protobuf.MapEntry;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.ArrayList;
@@ -768,9 +767,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider().getExecutor(),
partitionIndex, true, subFuture,
- SubscriptionMode.Durable, null, schema, interceptors,
- client.getConfiguration().getDefaultBackoffIntervalNanos(),
- client.getConfiguration().getMaxBackoffIntervalNanos());
+ SubscriptionMode.Durable, null, schema, interceptors
+ );
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
return subFuture;
})
@@ -782,8 +780,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
client.externalExecutorProvider().getExecutor(), -1, true, subFuture, SubscriptionMode.Durable, null,
- schema, interceptors, client.getConfiguration().getDefaultBackoffIntervalNanos(),
- client.getConfiguration().getMaxBackoffIntervalNanos());
+ schema, interceptors
+ );
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
futureList = Collections.singletonList(subFuture);
@@ -999,9 +997,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
client, partitionName, configurationData,
client.externalExecutorProvider().getExecutor(),
- partitionIndex, true, subFuture, SubscriptionMode.Durable, null, schema, interceptors,
- client.getConfiguration().getDefaultBackoffIntervalNanos(),
- client.getConfiguration().getMaxBackoffIntervalNanos());
+ partitionIndex, true, subFuture, SubscriptionMode.Durable, null, schema, interceptors
+ );
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
if (log.isDebugEnabled()) {
log.debug("[{}] create consumer {} for partitionName: {}",
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 7d446a0..54d2eb2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -188,11 +188,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
this.connectionHandler = new ConnectionHandler(this,
new BackoffBuilder()
- .setInitialTime(100, TimeUnit.MILLISECONDS)
- .setMax(60, TimeUnit.SECONDS)
+ .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
+ .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)
- .useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(),
- client.getConfiguration().getMaxBackoffIntervalNanos())
.create(),
this);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 4a8b487..ea23f46 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -79,7 +79,6 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.schema.SchemaInfo;
-import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
@@ -351,8 +350,8 @@ public class PulsarClientImpl implements PulsarClient {
} else {
int partitionIndex = TopicName.getPartitionIndex(topic);
consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, partitionIndex, false,
- consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
- this.conf.getDefaultBackoffIntervalNanos(), this.conf.getMaxBackoffIntervalNanos());
+ consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors
+ );
}
synchronized (consumers) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index df71360..fd9db1a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -83,8 +83,9 @@ public class ReaderImpl<T> implements Reader<T> {
final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
- partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null,
- client.getConfiguration().getDefaultBackoffIntervalNanos(), client.getConfiguration().getMaxBackoffIntervalNanos());
+ partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null
+
+ );
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 3f3f5bd..3b0b257 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -59,7 +59,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
ConsumerInterceptors<T> interceptors, long backoffIntervalNanos, long maxBackoffIntervalNanos) {
super(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode, startMessageId,
- schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
+ schema, interceptors);
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index e944896..af478ce 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -67,8 +67,8 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private int keepAliveIntervalSeconds = 30;
private int connectionTimeoutMs = 10000;
private int requestTimeoutMs = 60000;
- private long defaultBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
- private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(30);
+ private long initialBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
+ private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(60);
@JsonIgnore
private Clock clock = Clock.systemDefaultZone();
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index f463259..34112e5 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -38,8 +38,6 @@ import static org.mockito.Mockito.*;
public class ConsumerImplTest {
- private static final long DEFAULT_BACKOFF_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1);
- private static final long MAX_BACKOFF_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(20);
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private ConsumerImpl<ConsumerImpl> consumer;
@@ -58,21 +56,12 @@ public class ConsumerImplTest {
when(client.getConnection(anyString())).thenReturn(clientCnxFuture);
clientConf.setOperationTimeoutMs(100);
clientConf.setStatsIntervalSeconds(0);
- clientConf.setDefaultBackoffIntervalNanos(DEFAULT_BACKOFF_INTERVAL_NANOS);
- clientConf.setMaxBackoffIntervalNanos(MAX_BACKOFF_INTERVAL_NANOS);
when(client.getConfiguration()).thenReturn(clientConf);
consumerConf.setSubscriptionName("test-sub");
consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
- executorService, -1, false, subscribeFuture, SubscriptionMode.Durable, null, null, null,
- clientConf.getDefaultBackoffIntervalNanos(), clientConf.getMaxBackoffIntervalNanos());
- }
-
- @Test(invocationTimeOut = 500)
- public void testCorrectBackoffConfiguration() {
- final Backoff backoff = consumer.getConnectionHandler().backoff;
- Assert.assertEquals(backoff.backoffIntervalNanos(), DEFAULT_BACKOFF_INTERVAL_NANOS);
- Assert.assertEquals(backoff.maxBackoffIntervalNanos(), MAX_BACKOFF_INTERVAL_NANOS);
+ executorService, -1, false, subscribeFuture, SubscriptionMode.Durable, null, null, null
+ );
}
@Test(invocationTimeOut = 1000)
@@ -80,6 +69,14 @@ public class ConsumerImplTest {
consumer.notifyPendingReceivedCallback(null, null);
}
+ @Test(invocationTimeOut = 500)
+ public void testCorrectBackoffConfiguration() {
+ final Backoff backoff = consumer.getConnectionHandler().backoff;
+ ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
+ Assert.assertEquals(backoff.getMax(), TimeUnit.NANOSECONDS.toMillis(clientConfigurationData.getMaxBackoffIntervalNanos()));
+ Assert.assertEquals(backoff.next(), TimeUnit.NANOSECONDS.toMillis(clientConfigurationData.getInitialBackoffIntervalNanos()));
+ }
+
@Test(invocationTimeOut = 1000)
public void testNotifyPendingReceivedCallback_CompleteWithException() {
CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>();
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index ed92987..773cb2b 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -242,9 +242,9 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
// emit the tuple if retry doesn't need backoff else sleep with backoff time and return without doing
// anything
if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS,
- messageRetries.getNumRetries(), clientConf.getDefaultBackoffIntervalNanos(),
+ messageRetries.getNumRetries(), clientConf.getInitialBackoffIntervalNanos(),
clientConf.getMaxBackoffIntervalNanos())) {
- Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getDefaultBackoffIntervalNanos()));
+ Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getInitialBackoffIntervalNanos()));
} else {
// remove the message from the queue and emit to the topology, only if it should not be backedoff
LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId());