You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/02 16:41:14 UTC

[pulsar] branch master updated: [Issue-2122] [pulsar-client] Adding configuration for backoff strategy (#3848)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bdfc098  [Issue-2122] [pulsar-client] Adding configuration for backoff strategy (#3848)
bdfc098 is described below

commit bdfc09867325a271dc40a3aa87dcefb905769f06
Author: Richard Yu <yo...@gmail.com>
AuthorDate: Tue Apr 2 09:41:04 2019 -0700

    [Issue-2122] [pulsar-client] Adding configuration for backoff strategy (#3848)
    
    
    Fixes #2122
    
    ### Motivation
    
    Current backoff strategy is set by default and is too aggressive. What we should do is allow it to be configurable by the user.
    
    ### Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (not sure)
---
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  4 +-
 .../apache/pulsar/client/api/ClientBuilder.java    | 22 ++++++
 .../org/apache/pulsar/client/impl/Backoff.java     | 54 ++++++++++++---
 .../apache/pulsar/client/impl/BackoffBuilder.java  | 81 ++++++++++++++++++++++
 .../client/impl/BinaryProtoLookupService.java      | 10 ++-
 .../pulsar/client/impl/ClientBuilderImpl.java      | 12 ++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 46 +++++++++---
 .../client/impl/MultiTopicsConsumerImpl.java       | 11 ++-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  9 ++-
 .../pulsar/client/impl/PulsarClientImpl.java       |  3 +-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |  4 +-
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  | 10 ++-
 .../client/impl/conf/ClientConfigurationData.java  |  3 +
 .../pulsar/client/impl/ConsumerImplTest.java       | 16 ++++-
 .../java/org/apache/pulsar/storm/PulsarSpout.java  |  5 +-
 15 files changed, 257 insertions(+), 33 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index fa527c1..d1619f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -111,7 +111,9 @@ public class RawReaderImpl implements RawReader {
                 consumerFuture,
                 SubscriptionMode.Durable,
                 MessageId.earliest,
-                Schema.BYTES, null);
+                Schema.BYTES, null,
+                client.getConfiguration().getDefaultBackoffIntervalNanos(),
+                client.getConfiguration().getMaxBackoffIntervalNanos());
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
         }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 9d30aba..a99d08b 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -354,4 +354,26 @@ public interface ClientBuilder extends Cloneable {
      * @return the client builder instance
      */
     ClientBuilder connectionTimeout(int duration, TimeUnit unit);
+    
+    /**
+     * Set the duration of time for a backoff interval.
+     * 
+     * @param duration 
+     * 			 the duration of the interval
+     * @param unit
+     * 			 the time unit in which the duration is defined
+     * @return the client builder instance
+     */
+    ClientBuilder startingBackoffInterval(long duration, TimeUnit unit);
+    
+    /**
+     * Set the maximum duration of time for a backoff interval.
+     * 
+     * @param duration 
+     * 			 the duration of the interval
+     * @param unit
+     * 			 the time unit in which the duration is defined
+     * @return the client builder instance
+     */
+    ClientBuilder maxBackoffInterval(long duration, TimeUnit unit);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
index aa506da..1cb2e1e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
@@ -25,8 +25,10 @@ import java.util.concurrent.TimeUnit;
 
 // All variables are in TimeUnit millis by default
 public class Backoff {
-    private static final long DEFAULT_INTERVAL_IN_NANOSECONDS = TimeUnit.MILLISECONDS.toNanos(100);
-    private static final long MAX_BACKOFF_INTERVAL_NANOSECONDS = TimeUnit.SECONDS.toNanos(30);
+    public static final long DEFAULT_INTERVAL_IN_NANOSECONDS = TimeUnit.MILLISECONDS.toNanos(100);
+    public static final long MAX_BACKOFF_INTERVAL_NANOSECONDS = TimeUnit.SECONDS.toNanos(30);
+    private final long backoffIntervalNanos;
+    private final long maxBackoffIntervalNanos;
     private final long initial;
     private final long max;
     private final Clock clock;
@@ -40,19 +42,33 @@ public class Backoff {
 
     @VisibleForTesting
     Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
-            TimeUnit unitMandatoryStop, Clock clock) {
+            TimeUnit unitMandatoryStop, Clock clock, long backoffIntervalNanos, long maxBackoffIntervalNanos) {
         this.initial = unitInitial.toMillis(initial);
         this.max = unitMax.toMillis(max);
         this.next = this.initial;
         this.mandatoryStop = unitMandatoryStop.toMillis(mandatoryStop);
         this.clock = clock;
+        this.backoffIntervalNanos = backoffIntervalNanos;
+        this.maxBackoffIntervalNanos = maxBackoffIntervalNanos;
     }
 
+    @VisibleForTesting
+    Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
+            TimeUnit unitMandatoryStop, Clock clock) {
+    	this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock, 
+    		 Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
+    }
     public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
-            TimeUnit unitMandatoryStop) {
+                   TimeUnit unitMandatoryStop) {
         this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, Clock.systemDefaultZone());
     }
 
+    public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
+                   TimeUnit unitMandatoryStop, long backoffIntervalMs, long maxBackoffIntervalMs) {
+    	this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, Clock.systemDefaultZone(), 
+    	     backoffIntervalMs, maxBackoffIntervalMs);
+    }
+    
     public long next() {
         long current = this.next;
         if (current < max) {
@@ -99,14 +115,25 @@ public class Backoff {
         return firstBackoffTimeInMillis;
     }
 
-    public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) {
-        long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
+    @VisibleForTesting
+    long backoffIntervalNanos() {
+    	return backoffIntervalNanos;
+    }
+    
+    @VisibleForTesting
+    long maxBackoffIntervalNanos() {
+    	return maxBackoffIntervalNanos;
+    }
+    
+    public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts, 
+    									long defaultInterval, long maxBackoffInterval) {
+    	long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
         long currentTime = System.nanoTime();
-        long interval = DEFAULT_INTERVAL_IN_NANOSECONDS;
+        long interval = defaultInterval;
         for (int i = 1; i < failedAttempts; i++) {
             interval = interval * 2;
-            if (interval > MAX_BACKOFF_INTERVAL_NANOSECONDS) {
-                interval = MAX_BACKOFF_INTERVAL_NANOSECONDS;
+            if (interval > maxBackoffInterval) {
+                interval = maxBackoffInterval;
                 break;
             }
         }
@@ -114,4 +141,13 @@ public class Backoff {
         // if the current time is less than the time at which next retry should occur, we should backoff
         return currentTime < (initialTimestampInNano + interval);
     }
+    
+    public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) {
+        return Backoff.shouldBackoff(initialTimestamp, unitInitial, failedAttempts, 
+        							 DEFAULT_INTERVAL_IN_NANOSECONDS, MAX_BACKOFF_INTERVAL_NANOSECONDS);
+    }
+    
+    public boolean instanceShouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) {
+    	return Backoff.shouldBackoff(initialTimestamp, unitInitial, failedAttempts, backoffIntervalNanos, maxBackoffIntervalNanos);
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
new file mode 100644
index 0000000..aa997d7
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.time.Clock;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class BackoffBuilder {
+	private long backoffIntervalNanos;
+    private long maxBackoffIntervalNanos;
+    private long initial;
+    private TimeUnit unitInitial;
+    private long max;
+    private TimeUnit unitMax;
+    private Clock clock;
+    private long mandatoryStop;
+    private TimeUnit unitMandatoryStop;
+    
+    @VisibleForTesting
+    BackoffBuilder() {
+        this.initial = 0;
+        this.max = 0;
+        this.mandatoryStop = 0;
+        this.clock = Clock.systemDefaultZone();
+        this.backoffIntervalNanos = 0;
+        this.maxBackoffIntervalNanos = 0;
+    }
+    
+    public BackoffBuilder setInitialTime(long initial, TimeUnit unitInitial) {
+    	this.unitInitial = unitInitial;
+    	this.initial = initial;
+    	return this;
+    }
+    
+    public BackoffBuilder setMax(long max, TimeUnit unitMax) {
+    	this.unitMax = unitMax;
+    	this.max = max;
+    	return this;
+    }
+     
+    public BackoffBuilder setMandatoryStop(long mandatoryStop, TimeUnit unitMandatoryStop) {
+    	this.mandatoryStop = mandatoryStop;
+    	this.unitMandatoryStop = unitMandatoryStop;
+    	return this;
+    }
+    
+    public BackoffBuilder useDefaultBackoffIntervals() {
+    	return useUserConfiguredIntervals(Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, 
+    			                          Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS );
+    }
+    
+    public BackoffBuilder useUserConfiguredIntervals(long backoffIntervalNanos, 
+                                                     long maxBackoffIntervalNanos) {
+    	this.backoffIntervalNanos = backoffIntervalNanos;
+    	this.maxBackoffIntervalNanos = maxBackoffIntervalNanos;
+    	return this;
+    }
+    
+    public Backoff create() {
+    	return new Backoff(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock, 
+    			           backoffIntervalNanos, maxBackoffIntervalNanos);
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 8d07552..0e39f39 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -207,9 +207,13 @@ public class BinaryProtoLookupService implements LookupService {
         CompletableFuture<List<String>> topicsFuture = new CompletableFuture<List<String>>();
 
         AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
-        Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
-            opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS,
-            0 , TimeUnit.MILLISECONDS);
+        Backoff backoff = new BackoffBuilder()
+                .setInitialTime(100, TimeUnit.MILLISECONDS)
+                .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+                .setMax(0, TimeUnit.MILLISECONDS)
+                .useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(), 
+                                            client.getConfiguration().getMaxBackoffIntervalNanos())
+                .create();
         getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace, backoff, opTimeoutMs, topicsFuture, mode);
         return topicsFuture;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 5e00a30..a44d286 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -208,6 +208,18 @@ public class ClientBuilderImpl implements ClientBuilder {
         return this;
     }
 
+    @Override
+    public ClientBuilder startingBackoffInterval(long duration, TimeUnit unit) {
+    	conf.setDefaultBackoffIntervalNanos(unit.toNanos(duration));
+    	return this;
+    }
+    
+    @Override
+    public ClientBuilder maxBackoffInterval(long duration, TimeUnit unit) {
+    	conf.setMaxBackoffIntervalNanos(unit.toNanos(duration));
+    	return this;
+    }
+    
     public ClientConfigurationData getClientConfigurationData() {
         return conf;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index fc0c660..6dc96d7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -138,6 +138,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     private Producer<T> deadLetterProducer;
 
+    private final long backoffIntervalNanos;
+    private final long maxBackoffIntervalNanos;
+
     protected volatile boolean paused;
 
     enum SubscriptionMode {
@@ -152,18 +155,27 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                  ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
                  SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
-        if (conf.getReceiverQueueSize() == 0) {
+        return ConsumerImpl.newConsumerImpl(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, 
+                                            startMessageId, schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
+    }
+
+    static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
+            ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
+            SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
+            long backoffIntervalNanos, long maxBackoffIntervalNanos) {
+    	if (conf.getReceiverQueueSize() == 0) {
             return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture,
-                    subscriptionMode, startMessageId, schema, interceptors);
+                    subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
         } else {
             return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture,
-                    subscriptionMode, startMessageId, schema, interceptors);
+                    subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
         }
     }
-
+    
     protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                  ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
-                 SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
+                 SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
+                 long backoffIntervalNanos, long maxBackoffIntervalNanos) {
         super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors);
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = subscriptionMode;
@@ -208,8 +220,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
 
         this.connectionHandler = new ConnectionHandler(this,
-            new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS),
-            this);
+        		        new BackoffBuilder()
+        	               .setInitialTime(100, TimeUnit.MILLISECONDS)
+        	               .setMandatoryStop(60, TimeUnit.SECONDS)
+        	               .setMax(0, TimeUnit.MILLISECONDS)
+        	               .useUserConfiguredIntervals(backoffIntervalNanos, 
+        	                                           maxBackoffIntervalNanos)
+        	               .create(),
+                        this);
 
         this.topicName = TopicName.get(topic);
         if (this.topicName.isPersistent()) {
@@ -238,6 +256,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             possibleSendToDeadLetterTopicMessages = null;
         }
 
+        this.backoffIntervalNanos = backoffIntervalNanos;
+        this.maxBackoffIntervalNanos = maxBackoffIntervalNanos;
+
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
         grabCnx();
@@ -1448,9 +1469,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
 
         AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
-        Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
-            opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS,
-            0 , TimeUnit.MILLISECONDS);
+        Backoff backoff = new BackoffBuilder()
+                .setInitialTime(100, TimeUnit.MILLISECONDS)
+                .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+                .setMax(0, TimeUnit.MILLISECONDS)
+                .useUserConfiguredIntervals(backoffIntervalNanos, 
+                                            maxBackoffIntervalNanos)
+                .create();
+        
         CompletableFuture<MessageId> getLastMessageIdFuture = new CompletableFuture<>();
 
         internalGetLastMessageIdAsync(backoff, opTimeoutMs, getLastMessageIdFuture);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 71af9c7..69dd385 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -749,7 +749,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                         ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
                                 configurationData, client.externalExecutorProvider().getExecutor(),
                                 partitionIndex, subFuture,
-                                SubscriptionMode.Durable, null, schema, interceptors);
+                                SubscriptionMode.Durable, null, schema, interceptors,
+                                client.getConfiguration().getDefaultBackoffIntervalNanos(),
+                                client.getConfiguration().getMaxBackoffIntervalNanos());
                         consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
                         return subFuture;
                     })
@@ -761,7 +763,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
             ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
                     client.externalExecutorProvider().getExecutor(), 0, subFuture, SubscriptionMode.Durable, null,
-                    schema, interceptors);
+                    schema, interceptors, client.getConfiguration().getDefaultBackoffIntervalNanos(),
+                    client.getConfiguration().getMaxBackoffIntervalNanos());
             consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
 
             futureList = Collections.singletonList(subFuture);
@@ -977,7 +980,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                         ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
                             client, partitionName, configurationData,
                             client.externalExecutorProvider().getExecutor(),
-                            partitionIndex, subFuture, SubscriptionMode.Durable, null, schema, interceptors);
+                            partitionIndex, subFuture, SubscriptionMode.Durable, null, schema, interceptors,
+                            client.getConfiguration().getDefaultBackoffIntervalNanos(),
+                            client.getConfiguration().getMaxBackoffIntervalNanos());
                         consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] create consumer {} for partitionName: {}",
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 0479539..c80e529 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -183,8 +183,15 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         }
 
         this.connectionHandler = new ConnectionHandler(this,
-            new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS),
+        	new BackoffBuilder()
+        	    .setInitialTime(100, TimeUnit.MILLISECONDS)
+			    .setMandatoryStop(60, TimeUnit.SECONDS)
+			    .setMax(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)
+			    .useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(), 
+			                                client.getConfiguration().getMaxBackoffIntervalNanos())
+			    .create(),
             this);
+     
         grabCnx();
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 2e92c50..bdd3345 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -334,7 +334,8 @@ public class PulsarClientImpl implements PulsarClient {
                     listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors);
             } else {
                 consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, -1,
-                        consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors);
+                        consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors, 
+                        this.conf.getDefaultBackoffIntervalNanos(), this.conf.getMaxBackoffIntervalNanos());
             }
 
             synchronized (consumers) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 388e5c4..08ca1ed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -85,7 +85,9 @@ public class ReaderImpl<T> implements Reader<T> {
 
         final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
         consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
-                partitionIdx, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null);
+                partitionIdx, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null,
+                client.getConfiguration().getDefaultBackoffIntervalNanos(), client.getConfiguration().getMaxBackoffIntervalNanos());
+        
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 09b610f..b0dc4b3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -50,8 +50,16 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
             ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
             SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
             ConsumerInterceptors<T> interceptors) {
+    	this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, startMessageId,
+    		 schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
+    }
+    
+    public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
+            ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
+            SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
+            ConsumerInterceptors<T> interceptors, long backoffIntervalNanos, long maxBackoffIntervalNanos) {
         super(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, startMessageId,
-                schema, interceptors);
+              schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index f7bec63..db77bd1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl.conf;
 
 import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
@@ -59,6 +60,8 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     private int maxNumberOfRejectedRequestPerConnection = 50;
     private int keepAliveIntervalSeconds = 30;
     private int connectionTimeoutMs = 10000;
+    private long defaultBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
+    private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(30);
 
     public ClientConfigurationData clone() {
         try {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index e09184c..4e6bad6 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -32,11 +32,15 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Mockito.*;
 
 public class ConsumerImplTest {
 
+	private static final long DEFAULT_BACKOFF_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1);
+	private static final long MAX_BACKOFF_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(20);
+
     private final ExecutorService executorService = Executors.newSingleThreadExecutor();
     private ConsumerImpl<ConsumerImpl> consumer;
     private ConsumerConfigurationData consumerConf;
@@ -54,11 +58,21 @@ public class ConsumerImplTest {
         when(client.getConnection(anyString())).thenReturn(clientCnxFuture);
         clientConf.setOperationTimeoutMs(100);
         clientConf.setStatsIntervalSeconds(0);
+        clientConf.setDefaultBackoffIntervalNanos(DEFAULT_BACKOFF_INTERVAL_NANOS);
+        clientConf.setMaxBackoffIntervalNanos(MAX_BACKOFF_INTERVAL_NANOS);
         when(client.getConfiguration()).thenReturn(clientConf);
 
         consumerConf.setSubscriptionName("test-sub");
         consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
-                executorService, -1, subscribeFuture, SubscriptionMode.Durable, null, null, null);
+                executorService, -1, subscribeFuture, SubscriptionMode.Durable, null, null, null,
+                clientConf.getDefaultBackoffIntervalNanos(), clientConf.getMaxBackoffIntervalNanos());
+    }
+
+    @Test(invocationTimeOut = 500)
+    public void testCorrectBackoffConfiguration() {
+    	final Backoff backoff = consumer.getConnectionHandler().backoff;
+    	Assert.assertEquals(backoff.backoffIntervalNanos(), DEFAULT_BACKOFF_INTERVAL_NANOS);
+    	Assert.assertEquals(backoff.maxBackoffIntervalNanos(), MAX_BACKOFF_INTERVAL_NANOS);
     }
 
     @Test(invocationTimeOut = 1000)
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 713007b..8907fc3 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -179,8 +179,9 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
         if (msg != null) {
             MessageRetries messageRetries = pendingMessageRetries.get(msg.getMessageId());
             if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS,
-                    messageRetries.getNumRetries())) {
-                Utils.sleep(100);
+                    messageRetries.getNumRetries(), clientConf.getDefaultBackoffIntervalNanos(), 
+                    clientConf.getMaxBackoffIntervalNanos())) {
+                Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getDefaultBackoffIntervalNanos()));
             } else {
                 // remove the message from the queue and emit to the topology, only if it should not be backedoff
                 LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId());