You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/09/24 20:48:44 UTC

[pulsar] branch master updated: Fix client backoff (#5261)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ee42cf4  Fix client backoff (#5261)
ee42cf4 is described below

commit ee42cf403349154483d66f60a2c6ae8dd8518280
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Sep 24 13:48:37 2019 -0700

    Fix client backoff (#5261)
    
    * fix client backoff
    
    * fix tests
    
    * cleaning up
---
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  5 ++-
 .../org/apache/pulsar/client/impl/Backoff.java     | 32 +++--------------
 .../apache/pulsar/client/impl/BackoffBuilder.java  | 16 ++-------
 .../client/impl/BinaryProtoLookupService.java      |  2 --
 .../pulsar/client/impl/ClientBuilderImpl.java      |  2 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 42 ++++++----------------
 .../client/impl/MultiTopicsConsumerImpl.java       | 15 ++++----
 .../apache/pulsar/client/impl/ProducerImpl.java    |  6 ++--
 .../pulsar/client/impl/PulsarClientImpl.java       |  5 ++-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |  5 +--
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |  2 +-
 .../client/impl/conf/ClientConfigurationData.java  |  4 +--
 .../pulsar/client/impl/ConsumerImplTest.java       | 23 ++++++------
 .../java/org/apache/pulsar/storm/PulsarSpout.java  |  4 +--
 14 files changed, 49 insertions(+), 114 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index c807ee5..a59349d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -112,9 +112,8 @@ public class RawReaderImpl implements RawReader {
                 consumerFuture,
                 SubscriptionMode.Durable,
                 MessageId.earliest,
-                Schema.BYTES, null,
-                client.getConfiguration().getDefaultBackoffIntervalNanos(),
-                client.getConfiguration().getMaxBackoffIntervalNanos());
+                Schema.BYTES, null
+            );
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
index 868c375..de25cf7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
@@ -19,16 +19,18 @@
 package org.apache.pulsar.client.impl;
 
 import com.google.common.annotations.VisibleForTesting;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
 import java.time.Clock;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 // All variables are in TimeUnit millis by default
+@Data
 public class Backoff {
     public static final long DEFAULT_INTERVAL_IN_NANOSECONDS = TimeUnit.MILLISECONDS.toNanos(100);
     public static final long MAX_BACKOFF_INTERVAL_NANOSECONDS = TimeUnit.SECONDS.toNanos(30);
-    private final long backoffIntervalNanos;
-    private final long maxBackoffIntervalNanos;
     private final long initial;
     private final long max;
     private final Clock clock;
@@ -42,33 +44,19 @@ public class Backoff {
 
     @VisibleForTesting
     Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
-            TimeUnit unitMandatoryStop, Clock clock, long backoffIntervalNanos, long maxBackoffIntervalNanos) {
+            TimeUnit unitMandatoryStop, Clock clock) {
         this.initial = unitInitial.toMillis(initial);
         this.max = unitMax.toMillis(max);
         this.next = this.initial;
         this.mandatoryStop = unitMandatoryStop.toMillis(mandatoryStop);
         this.clock = clock;
-        this.backoffIntervalNanos = backoffIntervalNanos;
-        this.maxBackoffIntervalNanos = maxBackoffIntervalNanos;
     }
 
-    @VisibleForTesting
-    Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
-            TimeUnit unitMandatoryStop, Clock clock) {
-        this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock,
-    		 Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
-    }
     public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
                    TimeUnit unitMandatoryStop) {
         this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, Clock.systemDefaultZone());
     }
 
-    public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
-                   TimeUnit unitMandatoryStop, long backoffIntervalMs, long maxBackoffIntervalMs) {
-        this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, Clock.systemDefaultZone(),
-    	     backoffIntervalMs, maxBackoffIntervalMs);
-    }
-
     public long next() {
         long current = this.next;
         if (current < max) {
@@ -115,16 +103,6 @@ public class Backoff {
         return firstBackoffTimeInMillis;
     }
 
-    @VisibleForTesting
-    long backoffIntervalNanos() {
-    	return backoffIntervalNanos;
-    }
-
-    @VisibleForTesting
-    long maxBackoffIntervalNanos() {
-    	return maxBackoffIntervalNanos;
-    }
-
     public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts,
     									long defaultInterval, long maxBackoffInterval) {
     	long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
index aa997d7..a1c7614 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
@@ -61,21 +61,9 @@ public class BackoffBuilder {
     	this.unitMandatoryStop = unitMandatoryStop;
     	return this;
     }
-    
-    public BackoffBuilder useDefaultBackoffIntervals() {
-    	return useUserConfiguredIntervals(Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, 
-    			                          Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS );
-    }
-    
-    public BackoffBuilder useUserConfiguredIntervals(long backoffIntervalNanos, 
-                                                     long maxBackoffIntervalNanos) {
-    	this.backoffIntervalNanos = backoffIntervalNanos;
-    	this.maxBackoffIntervalNanos = maxBackoffIntervalNanos;
-    	return this;
-    }
+
     
     public Backoff create() {
-    	return new Backoff(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock, 
-    			           backoffIntervalNanos, maxBackoffIntervalNanos);
+    	return new Backoff(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock);
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index fae3a4f..b84a8fe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -213,8 +213,6 @@ public class BinaryProtoLookupService implements LookupService {
                 .setInitialTime(100, TimeUnit.MILLISECONDS)
                 .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
                 .setMax(0, TimeUnit.MILLISECONDS)
-                .useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(),
-                                            client.getConfiguration().getMaxBackoffIntervalNanos())
                 .create();
         getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace, backoff, opTimeoutMs, topicsFuture, mode);
         return topicsFuture;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index c65b7c2..16ea688 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -211,7 +211,7 @@ public class ClientBuilderImpl implements ClientBuilder {
 
     @Override
     public ClientBuilder startingBackoffInterval(long duration, TimeUnit unit) {
-    	conf.setDefaultBackoffIntervalNanos(unit.toNanos(duration));
+    	conf.setInitialBackoffIntervalNanos(unit.toNanos(duration));
     	return this;
     }
     
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6881b53..b13e2fa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -40,7 +40,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -140,9 +139,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     private Producer<T> deadLetterProducer;
 
-    private final long backoffIntervalNanos;
-    private final long maxBackoffIntervalNanos;
-
     protected volatile boolean paused;
 
     enum SubscriptionMode {
@@ -155,29 +151,20 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     }
 
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
-                 ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
-                 SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
-        return ConsumerImpl.newConsumerImpl(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode,
-                                            startMessageId, schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
-    }
-
-    static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
-            ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
-            SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
-            long backoffIntervalNanos, long maxBackoffIntervalNanos) {
+                                               ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
+                                               SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
     	if (conf.getReceiverQueueSize() == 0) {
             return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
-                    subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
+                    subscriptionMode, startMessageId, schema, interceptors);
         } else {
             return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
-                    subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
+                    subscriptionMode, startMessageId, schema, interceptors);
         }
     }
 
     protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
-                 ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
-                 SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
-                 long backoffIntervalNanos, long maxBackoffIntervalNanos) {
+                           ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
+                           SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
         super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors);
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = subscriptionMode;
@@ -225,13 +212,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
         this.connectionHandler = new ConnectionHandler(this,
         		        new BackoffBuilder()
-                           .setInitialTime(100, TimeUnit.MILLISECONDS)
-                           .setMax(60, TimeUnit.SECONDS)
-                           .setMandatoryStop(0, TimeUnit.MILLISECONDS)
-                           .useUserConfiguredIntervals(backoffIntervalNanos,
-        	                                           maxBackoffIntervalNanos)
-        	               .create(),
-                        this);
+                                .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
+                                .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
+                                .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+                                .create(),
+                this);
 
         this.topicName = TopicName.get(topic);
         if (this.topicName.isPersistent()) {
@@ -260,9 +245,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             possibleSendToDeadLetterTopicMessages = null;
         }
 
-        this.backoffIntervalNanos = backoffIntervalNanos;
-        this.maxBackoffIntervalNanos = maxBackoffIntervalNanos;
-
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
         grabCnx();
@@ -1498,8 +1480,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 .setInitialTime(100, TimeUnit.MILLISECONDS)
                 .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
                 .setMandatoryStop(0, TimeUnit.MILLISECONDS)
-                .useUserConfiguredIntervals(backoffIntervalNanos,
-                                            maxBackoffIntervalNanos)
                 .create();
 
         CompletableFuture<MessageId> getLastMessageIdFuture = new CompletableFuture<>();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 761f39b..3986285 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.Lists;
-import com.google.protobuf.MapEntry;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import java.util.ArrayList;
@@ -768,9 +767,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                         ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
                                 configurationData, client.externalExecutorProvider().getExecutor(),
                                 partitionIndex, true, subFuture,
-                                SubscriptionMode.Durable, null, schema, interceptors,
-                                client.getConfiguration().getDefaultBackoffIntervalNanos(),
-                                client.getConfiguration().getMaxBackoffIntervalNanos());
+                                SubscriptionMode.Durable, null, schema, interceptors
+                        );
                         consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
                         return subFuture;
                     })
@@ -782,8 +780,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
             ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
                     client.externalExecutorProvider().getExecutor(), -1, true, subFuture, SubscriptionMode.Durable, null,
-                    schema, interceptors, client.getConfiguration().getDefaultBackoffIntervalNanos(),
-                    client.getConfiguration().getMaxBackoffIntervalNanos());
+                    schema, interceptors
+            );
             consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
 
             futureList = Collections.singletonList(subFuture);
@@ -999,9 +997,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                         ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
                             client, partitionName, configurationData,
                             client.externalExecutorProvider().getExecutor(),
-                            partitionIndex, true, subFuture, SubscriptionMode.Durable, null, schema, interceptors,
-                            client.getConfiguration().getDefaultBackoffIntervalNanos(),
-                            client.getConfiguration().getMaxBackoffIntervalNanos());
+                            partitionIndex, true, subFuture, SubscriptionMode.Durable, null, schema, interceptors
+                        );
                         consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] create consumer {} for partitionName: {}",
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 7d446a0..54d2eb2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -188,11 +188,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
         this.connectionHandler = new ConnectionHandler(this,
         	new BackoffBuilder()
-        	    .setInitialTime(100, TimeUnit.MILLISECONDS)
-			    .setMax(60, TimeUnit.SECONDS)
+        	    .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
+			    .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
 			    .setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)
-			    .useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(),
-			                                client.getConfiguration().getMaxBackoffIntervalNanos())
 			    .create(),
             this);
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 4a8b487..ea23f46 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -79,7 +79,6 @@ import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.schema.SchemaInfo;
-import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
@@ -351,8 +350,8 @@ public class PulsarClientImpl implements PulsarClient {
             } else {
                 int partitionIndex = TopicName.getPartitionIndex(topic);
                 consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, partitionIndex, false,
-                        consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
-                        this.conf.getDefaultBackoffIntervalNanos(), this.conf.getMaxBackoffIntervalNanos());
+                        consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors
+                );
             }
 
             synchronized (consumers) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index df71360..fd9db1a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -83,8 +83,9 @@ public class ReaderImpl<T> implements Reader<T> {
 
         final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
         consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
-                partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null,
-                client.getConfiguration().getDefaultBackoffIntervalNanos(), client.getConfiguration().getMaxBackoffIntervalNanos());
+                partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null
+
+        );
         
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 3f3f5bd..3b0b257 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -59,7 +59,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
             SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
             ConsumerInterceptors<T> interceptors, long backoffIntervalNanos, long maxBackoffIntervalNanos) {
         super(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode, startMessageId,
-              schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
+              schema, interceptors);
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index e944896..af478ce 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -67,8 +67,8 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     private int keepAliveIntervalSeconds = 30;
     private int connectionTimeoutMs = 10000;
     private int requestTimeoutMs = 60000;
-    private long defaultBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
-    private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(30);
+    private long initialBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
+    private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(60);
 
     @JsonIgnore
     private Clock clock = Clock.systemDefaultZone();
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index f463259..34112e5 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -38,8 +38,6 @@ import static org.mockito.Mockito.*;
 
 public class ConsumerImplTest {
 
-	private static final long DEFAULT_BACKOFF_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1);
-	private static final long MAX_BACKOFF_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(20);
 
     private final ExecutorService executorService = Executors.newSingleThreadExecutor();
     private ConsumerImpl<ConsumerImpl> consumer;
@@ -58,21 +56,12 @@ public class ConsumerImplTest {
         when(client.getConnection(anyString())).thenReturn(clientCnxFuture);
         clientConf.setOperationTimeoutMs(100);
         clientConf.setStatsIntervalSeconds(0);
-        clientConf.setDefaultBackoffIntervalNanos(DEFAULT_BACKOFF_INTERVAL_NANOS);
-        clientConf.setMaxBackoffIntervalNanos(MAX_BACKOFF_INTERVAL_NANOS);
         when(client.getConfiguration()).thenReturn(clientConf);
 
         consumerConf.setSubscriptionName("test-sub");
         consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
-                executorService, -1, false, subscribeFuture, SubscriptionMode.Durable, null, null, null,
-                clientConf.getDefaultBackoffIntervalNanos(), clientConf.getMaxBackoffIntervalNanos());
-    }
-
-    @Test(invocationTimeOut = 500)
-    public void testCorrectBackoffConfiguration() {
-    	final Backoff backoff = consumer.getConnectionHandler().backoff;
-    	Assert.assertEquals(backoff.backoffIntervalNanos(), DEFAULT_BACKOFF_INTERVAL_NANOS);
-    	Assert.assertEquals(backoff.maxBackoffIntervalNanos(), MAX_BACKOFF_INTERVAL_NANOS);
+                executorService, -1, false, subscribeFuture, SubscriptionMode.Durable, null, null, null
+        );
     }
 
     @Test(invocationTimeOut = 1000)
@@ -80,6 +69,14 @@ public class ConsumerImplTest {
         consumer.notifyPendingReceivedCallback(null, null);
     }
 
+    @Test(invocationTimeOut = 500)
+    public void testCorrectBackoffConfiguration() {
+        final Backoff backoff = consumer.getConnectionHandler().backoff;
+        ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
+        Assert.assertEquals(backoff.getMax(), TimeUnit.NANOSECONDS.toMillis(clientConfigurationData.getMaxBackoffIntervalNanos()));
+        Assert.assertEquals(backoff.next(), TimeUnit.NANOSECONDS.toMillis(clientConfigurationData.getInitialBackoffIntervalNanos()));
+    }
+
     @Test(invocationTimeOut = 1000)
     public void testNotifyPendingReceivedCallback_CompleteWithException() {
         CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>();
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index ed92987..773cb2b 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -242,9 +242,9 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
                 // emit the tuple if retry doesn't need backoff else sleep with backoff time and return without doing
                 // anything
                 if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS,
-                        messageRetries.getNumRetries(), clientConf.getDefaultBackoffIntervalNanos(),
+                        messageRetries.getNumRetries(), clientConf.getInitialBackoffIntervalNanos(),
                         clientConf.getMaxBackoffIntervalNanos())) {
-                    Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getDefaultBackoffIntervalNanos()));
+                    Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getInitialBackoffIntervalNanos()));
                 } else {
                     // remove the message from the queue and emit to the topology, only if it should not be backedoff
                     LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId());