You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 20:40:00 UTC

[bookkeeper] 02/04: [TABLE SERVICE] apply backoff policy to rpc requests if storage container is not found

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 71e0604ed60d6e1cd14cef41b7f041d57422dbfc
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed May 2 21:19:42 2018 -0700

    [TABLE SERVICE] apply backoff policy to rpc requests if storage container is not found
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    A storage container can move between servers due to failures, or it can take time to start on a server. During that short period of time, it is "unavailable" and clients will receive `Status.NOT_FOUND` from grpc channels. The client should retry on this error and attempt to re-locate the storage container again.
    
    *Modification*
    
    - add backoff policy in client settings
    - apply the backoff policy at storage container channel. if it receives `NOT_FOUND` grpc exception, it will reset the server channel, so subsequent requests will re-attempt to relocate the storage container.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1379 from sijie/rpc_backoffs
---
 .../org/apache/bookkeeper/common/util/Backoff.java |  12 +-
 .../bookkeeper/clients/StorageClientImpl.java      |   3 +-
 .../clients/config/StorageClientSettings.java      |  12 ++
 .../impl/container/StorageContainerChannel.java    |  18 +-
 .../clients/impl/internal/LocationClientImpl.java  |   1 -
 .../clients/impl/internal/MetaRangeClientImpl.java |  15 +-
 .../clients/impl/internal/RootRangeClientImpl.java |  11 +-
 .../internal/RootRangeClientImplWithRetries.java   | 132 ++++++++++++
 .../internal/StorageServerClientManagerImpl.java   |  10 +-
 .../internal/mr/MetaRangeRequestProcessor.java     |  11 +-
 .../bookkeeper/clients/utils/ClientConstants.java  |  16 ++
 .../utils/ListenableFutureRpcProcessor.java        |  44 ++--
 .../apache/bookkeeper/clients/utils/RpcUtils.java  |  13 ++
 .../RootRangeClientImplWithRetriesTest.java        | 173 +++++++++++++++
 .../utils/ListenableFutureRpcProcessorTest.java    | 236 +++++++++++++++++++++
 .../bookkeeper/clients/utils/RpcUtilsTest.java     |  50 +++++
 .../java/base/src/test/resources/log4j.properties  |  51 +++++
 .../clients/impl/kv/PByteBufTableImpl.java         |   7 +-
 .../clients/impl/kv/PByteBufTableRangeImpl.java    |  21 +-
 .../clients/impl/kv/TableRequestProcessor.java     |  12 +-
 .../clients/impl/kv/TableRequestProcessorTest.java |   4 +-
 .../clients/impl/kv/TestPByteBufTableImpl.java     |   4 +-
 22 files changed, 811 insertions(+), 45 deletions(-)

diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java
index 085e4cd..fdf8055 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 import lombok.Data;
 import lombok.ToString;
+import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
 
 /**
  * Implements various backoff strategies.
@@ -34,6 +35,12 @@ import lombok.ToString;
  */
 public class Backoff {
 
+    public static final Policy DEFAULT = Jitter.of(
+        Type.EXPONENTIAL,
+        200,
+        2000,
+        3);
+
     private static final int MaxBitShift = 62;
 
     /**
@@ -95,7 +102,10 @@ public class Backoff {
     @ToString
     public static class Jitter implements Policy {
 
-        enum Type {
+        /**
+         * Jitter type.
+         */
+        public enum Type {
             DECORRELATED,
             EQUAL,
             EXPONENTIAL
diff --git a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
index 0cde4e2..aff322c 100644
--- a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
+++ b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
@@ -98,7 +98,8 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable implements StorageCli
                     streamName,
                     props,
                     serverManager,
-                    scheduler.chooseThread(props.getStreamId())
+                    scheduler.chooseThread(props.getStreamId()),
+                    settings.backoffPolicy()
                 ).initialize();
             }),
             future
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
index 825e1d3..20b5821 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
@@ -24,6 +24,8 @@ import io.grpc.ManagedChannelBuilder;
 import io.grpc.NameResolver;
 import java.util.List;
 import java.util.Optional;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.inferred.freebuilder.FreeBuilder;
 
@@ -80,6 +82,15 @@ public interface StorageClientSettings {
     Optional<String> clientName();
 
     /**
+     * Configure a backoff policy for the client.
+     *
+     * <p>There are a few default backoff policies defined in {@link org.apache.bookkeeper.common.util.Backoff}.
+     *
+     * @return backoff policy provider
+     */
+    Backoff.Policy backoffPolicy();
+
+    /**
      * Builder of {@link StorageClientSettings} instances.
      */
     class Builder extends StorageClientSettings_Builder {
@@ -87,6 +98,7 @@ public interface StorageClientSettings {
         Builder() {
             numWorkerThreads(Runtime.getRuntime().availableProcessors());
             usePlaintext(true);
+            backoffPolicy(ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
         }
 
         @Override
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
index 4006776..8635e0f 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
@@ -78,11 +78,25 @@ public class StorageContainerChannel {
         return rsChannelFuture;
     }
 
-    @VisibleForTesting
-    synchronized void resetStorageServerChannelFuture() {
+    public synchronized void resetStorageServerChannelFuture() {
         rsChannelFuture = null;
     }
 
+    public synchronized boolean resetStorageServerChannelFuture(CompletableFuture<StorageServerChannel> oldFuture) {
+        if (oldFuture != null) {
+            // we only reset the channel that we expect to reset
+            if (rsChannelFuture == oldFuture) {
+                rsChannelFuture = null;
+                return true;
+            } else {
+                return false;
+            }
+        } else {
+            rsChannelFuture = null;
+            return true;
+        }
+    }
+
     @VisibleForTesting
     public synchronized void setStorageServerChannelFuture(CompletableFuture<StorageServerChannel> rsChannelFuture) {
         this.rsChannelFuture = rsChannelFuture;
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
index fa02004..95cae9a 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
@@ -105,7 +105,6 @@ public class LocationClientImpl implements LocationClient {
             }
             switch (status.getCode()) {
                 case INVALID_ARGUMENT:
-                case NOT_FOUND:
                 case ALREADY_EXISTS:
                 case PERMISSION_DENIED:
                 case UNAUTHENTICATED:
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
index c84caeb..10481f1 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
@@ -29,6 +29,8 @@ import org.apache.bookkeeper.clients.impl.container.StorageContainerChannelManag
 import org.apache.bookkeeper.clients.impl.internal.api.HashStreamRanges;
 import org.apache.bookkeeper.clients.impl.internal.api.MetaRangeClient;
 import org.apache.bookkeeper.clients.impl.internal.mr.MetaRangeRequestProcessor;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
 
@@ -41,13 +43,23 @@ class MetaRangeClientImpl implements MetaRangeClient {
     private final StreamProperties streamProps;
     private final ScheduledExecutorService executor;
     private final StorageContainerChannel scClient;
+    private final Backoff.Policy backoffPolicy;
 
     MetaRangeClientImpl(StreamProperties streamProps,
                         OrderedScheduler scheduler,
                         StorageContainerChannelManager channelManager) {
+        this(streamProps, scheduler, channelManager, ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+
+    }
+
+    MetaRangeClientImpl(StreamProperties streamProps,
+                        OrderedScheduler scheduler,
+                        StorageContainerChannelManager channelManager,
+                        Backoff.Policy backoffPolicy) {
         this.streamProps = streamProps;
         this.executor = scheduler.chooseThread(streamProps.getStreamId());
         this.scClient = channelManager.getOrCreate(streamProps.getStorageContainerId());
+        this.backoffPolicy = backoffPolicy;
     }
 
     @Override
@@ -71,7 +83,8 @@ class MetaRangeClientImpl implements MetaRangeClient {
                 streamProps),
             (response) -> createActiveRanges(response.getGetActiveRangesResp()),
             scClient,
-            executor
+            executor,
+            backoffPolicy
         ).process();
     }
 
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImpl.java
index 51ea9e3..72bb0e1 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImpl.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.clients.impl.internal;
 
 import static org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils.createRootRangeException;
+import static org.apache.bookkeeper.clients.utils.RpcUtils.isContainerNotFound;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
 import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createCreateNamespaceRequest;
 import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createCreateStreamRequest;
@@ -79,7 +80,15 @@ class RootRangeClientImpl implements RootRangeClient {
         ProcessRequestFunc<ReqT, RespT, RootRangeServiceFutureStub> processRequestFunc,
         ProcessResponseFunc<RespT, T> processResponseFunc) {
 
-        CompletableFuture<T> result = FutureUtils.createFuture();
+        CompletableFuture<T> result = FutureUtils.<T>createFuture()
+            .whenComplete((v, cause) -> {
+                if (null != cause && isContainerNotFound(cause)) {
+                    // if the rpc fails with `NOT_FOUND`, it means the storage container is not owned by any servers
+                    // yet. in this case, reset the storage server channel, this allows subsequent retries will be
+                    // forced to re-locate the containers.
+                    scClient.resetStorageServerChannelFuture();
+                }
+            });
         scClient.getStorageContainerChannelFuture().whenComplete((rsChannel, cause) -> {
             if (null != cause) {
                 handleGetRootRangeServiceFailure(result, cause);
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java
new file mode 100644
index 0000000..d83d5a7
--- /dev/null
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.clients.impl.internal;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient;
+import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.Retries;
+import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
+import org.apache.bookkeeper.stream.proto.NamespaceProperties;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
+import org.apache.bookkeeper.stream.proto.StreamProperties;
+
+/**
+ * A root range client wrapper with retries.
+ */
+class RootRangeClientImplWithRetries implements RootRangeClient {
+
+    @VisibleForTesting
+    static final Predicate<Throwable> ROOT_RANGE_CLIENT_RETRY_PREDICATE =
+        cause -> shouldRetryOnException(cause);
+
+    private static boolean shouldRetryOnException(Throwable cause) {
+        if (cause instanceof StatusRuntimeException || cause instanceof StatusException) {
+            Status status;
+            if (cause instanceof StatusException) {
+                status = ((StatusException) cause).getStatus();
+            } else {
+                status = ((StatusRuntimeException) cause).getStatus();
+            }
+            switch (status.getCode()) {
+                case INVALID_ARGUMENT:
+                case ALREADY_EXISTS:
+                case PERMISSION_DENIED:
+                case UNAUTHENTICATED:
+                    return false;
+                default:
+                    return true;
+            }
+        } else if (cause instanceof RuntimeException) {
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+    private final RootRangeClient client;
+    private final Backoff.Policy backoffPolicy;
+    private final OrderedScheduler scheduler;
+
+    RootRangeClientImplWithRetries(RootRangeClient client,
+                                   Backoff.Policy backoffPolicy,
+                                   OrderedScheduler scheduler) {
+        this.client = client;
+        this.backoffPolicy = backoffPolicy;
+        this.scheduler = scheduler;
+    }
+
+    private <T> CompletableFuture<T> runRpcWithRetries(
+            Supplier<CompletableFuture<T>> futureSupplier) {
+        return Retries.run(
+            backoffPolicy.toBackoffs(),
+            ROOT_RANGE_CLIENT_RETRY_PREDICATE,
+            futureSupplier,
+            scheduler);
+    }
+
+    @Override
+    public CompletableFuture<NamespaceProperties> createNamespace(String namespace,
+                                                                  NamespaceConfiguration nsConf) {
+        return runRpcWithRetries(() -> client.createNamespace(namespace, nsConf));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> deleteNamespace(String namespace) {
+        return runRpcWithRetries(() -> client.deleteNamespace(namespace));
+    }
+
+    @Override
+    public CompletableFuture<NamespaceProperties> getNamespace(String namespace) {
+        return runRpcWithRetries(() -> client.getNamespace(namespace));
+    }
+
+    @Override
+    public CompletableFuture<StreamProperties> createStream(String nsName,
+                                                            String streamName,
+                                                            StreamConfiguration streamConf) {
+        return runRpcWithRetries(() ->
+            client.createStream(nsName, streamName, streamConf));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> deleteStream(String nsName, String streamName) {
+        return runRpcWithRetries(() ->
+            client.deleteStream(nsName, streamName));
+    }
+
+    @Override
+    public CompletableFuture<StreamProperties> getStream(String nsName, String streamName) {
+        return runRpcWithRetries(() ->
+            client.getStream(nsName, streamName));
+    }
+
+    @Override
+    public CompletableFuture<StreamProperties> getStream(long streamId) {
+        return runRpcWithRetries(() ->
+            client.getStream(streamId));
+    }
+}
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
index 0e64abe..d219b6a 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
@@ -79,9 +79,13 @@ public class StorageServerClientManagerImpl
             this.channelManager,
             this.locationClient,
             scheduler);
-        this.rootRangeClient = new RootRangeClientImpl(
-            scheduler,
-            scChannelManager);
+        this.rootRangeClient = new RootRangeClientImplWithRetries(
+            new RootRangeClientImpl(
+                scheduler,
+                scChannelManager),
+            settings.backoffPolicy(),
+            scheduler
+        );
         this.streamMetadataCache = new StreamMetadataCache(rootRangeClient);
         this.metaRangeClients = Maps.newConcurrentMap();
     }
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
index f7f2480..cb25e84 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
@@ -27,6 +27,7 @@ import java.util.function.Function;
 import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
 import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
 import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
@@ -41,8 +42,9 @@ public class MetaRangeRequestProcessor<RespT>
         StorageContainerRequest request,
         Function<StorageContainerResponse, T> responseFunc,
         StorageContainerChannel channel,
-        ScheduledExecutorService executor) {
-        return new MetaRangeRequestProcessor<>(request, responseFunc, channel, executor);
+        ScheduledExecutorService executor,
+        Backoff.Policy backoffPolicy) {
+        return new MetaRangeRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
     }
 
     private final StorageContainerRequest request;
@@ -51,8 +53,9 @@ public class MetaRangeRequestProcessor<RespT>
     private MetaRangeRequestProcessor(StorageContainerRequest request,
                                       Function<StorageContainerResponse, RespT> responseFunc,
                                       StorageContainerChannel channel,
-                                      ScheduledExecutorService executor) {
-        super(channel, executor);
+                                      ScheduledExecutorService executor,
+                                      Backoff.Policy backoffPolicy) {
+        super(channel, executor, backoffPolicy);
         this.request = request;
         this.responseFunc = responseFunc;
     }
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ClientConstants.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ClientConstants.java
index 1fe486d..42b8fec 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ClientConstants.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ClientConstants.java
@@ -18,6 +18,10 @@
 
 package org.apache.bookkeeper.clients.utils;
 
+import org.apache.bookkeeper.common.util.Backoff.Jitter;
+import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+
 /**
  * Client related constants.
  */
@@ -32,5 +36,17 @@ public final class ClientConstants {
     public static final int DEFAULT_BACKOFF_START_MS = 200;
     public static final int DEFAULT_BACKOFF_MAX_MS = 1000;
     public static final int DEFAULT_BACKOFF_MULTIPLIER = 2;
+    public static final int DEFAULT_BACKOFF_RETRIES = 3;
+
+    public static final Policy DEFAULT_BACKOFF_POLICY = Jitter.of(
+        Type.EXPONENTIAL,
+        DEFAULT_BACKOFF_START_MS,
+        DEFAULT_BACKOFF_MAX_MS,
+        DEFAULT_BACKOFF_RETRIES);
+
+    public static final Policy DEFAULT_INFINIT_BACKOFF_POLICY = Jitter.of(
+        Type.EXPONENTIAL,
+        DEFAULT_BACKOFF_START_MS,
+        DEFAULT_BACKOFF_MAX_MS);
 
 }
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessor.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessor.java
index af49a9b..28ad7ca 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessor.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessor.java
@@ -33,7 +33,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
 import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
 
 /**
  * A process for processing rpc request on storage container channel.
@@ -44,27 +44,22 @@ public abstract class ListenableFutureRpcProcessor<RequestT, ResponseT, ResultT>
     FutureCallback<ResponseT>,
     Runnable {
 
-    private static final long startBackoffMs = 200;
-    private static final long maxBackoffMs = 2000;
-    private static final int maxRetries = 3;
-
     private final StorageContainerChannel scChannel;
     private final Iterator<Long> backoffs;
     private final ScheduledExecutorService executor;
     private final CompletableFuture<ResultT> resultFuture;
 
+    private CompletableFuture<StorageServerChannel> serverChannelFuture = null;
+
     protected ListenableFutureRpcProcessor(StorageContainerChannel channel,
-                                           ScheduledExecutorService executor) {
+                                           ScheduledExecutorService executor,
+                                           Policy backoffPolicy) {
         this.scChannel = channel;
-        this.backoffs = configureBackoffs();
+        this.backoffs = backoffPolicy.toBackoffs().iterator();
         this.resultFuture = FutureUtils.createFuture();
         this.executor = executor;
     }
 
-    protected Iterator<Long> configureBackoffs() {
-        return Backoff.exponentialJittered(startBackoffMs, maxBackoffMs).limit(maxRetries).iterator();
-    }
-
     /**
      * Create the rpc request for the processor.
      *
@@ -88,7 +83,8 @@ public abstract class ListenableFutureRpcProcessor<RequestT, ResponseT, ResultT>
     protected abstract ResultT processResponse(ResponseT response) throws Exception;
 
     public CompletableFuture<ResultT> process() {
-        scChannel.getStorageContainerChannelFuture().whenCompleteAsync(this, executor);
+        serverChannelFuture = scChannel.getStorageContainerChannelFuture();
+        serverChannelFuture.whenCompleteAsync(this, executor);
         return resultFuture;
     }
 
@@ -106,7 +102,9 @@ public abstract class ListenableFutureRpcProcessor<RequestT, ResponseT, ResultT>
     @Override
     public void accept(StorageServerChannel storageServerChannel, Throwable cause) {
         if (null != cause) {
-            // failure to retrieve a channel to the server that hosts this storage container
+            // The `StorageContainerChannel` already retry on failures related to server channel,
+            // So we don't need to retry here if failed to retrieve a channel to the server
+            // that hosts this storage container.
             resultFuture.completeExceptionally(cause);
             return;
         }
@@ -141,14 +139,26 @@ public abstract class ListenableFutureRpcProcessor<RequestT, ResponseT, ResultT>
 
     @Override
     public void onFailure(Throwable t) {
-        boolean shouldRetry = false;
+        Status status = null;
         if (t instanceof StatusRuntimeException) {
-            shouldRetry = shouldRetryOn(((StatusRuntimeException) t).getStatus());
+            status = ((StatusRuntimeException) t).getStatus();
         } else if (t instanceof StatusException) {
-            shouldRetry = shouldRetryOn(((StatusException) t).getStatus());
+            status = ((StatusException) t).getStatus();
+        }
+
+        if (Status.NOT_FOUND == status) {
+            // `NOT_FOUND` means storage container is not found. that means:
+            //
+            // - the container is moved to a different server
+            // - the container is not assigned to any servers yet
+            // - the container is assigned, but it is still starting up and not ready for serving
+            //
+            // at either case, we need to reset the storage server channel, so next retry can attempt to re-locate
+            // the storage container again
+            scChannel.resetStorageServerChannelFuture(serverChannelFuture);
         }
 
-        if (shouldRetry && backoffs.hasNext()) {
+        if (shouldRetryOn(status) && backoffs.hasNext()) {
             long backoffMs = backoffs.next();
             executor.schedule(this, backoffMs, TimeUnit.MILLISECONDS);
         } else {
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/RpcUtils.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/RpcUtils.java
index d05bfff..46ca205 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/RpcUtils.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/RpcUtils.java
@@ -17,6 +17,9 @@ package org.apache.bookkeeper.clients.utils;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -48,6 +51,16 @@ public class RpcUtils {
         void process(RespT resp, CompletableFuture<T> resultFuture);
     }
 
+    public static boolean isContainerNotFound(Throwable cause) {
+        if (cause instanceof StatusRuntimeException) {
+            return Status.NOT_FOUND ==  ((StatusRuntimeException) cause).getStatus();
+        } else if (cause instanceof StatusException) {
+            return Status.NOT_FOUND ==  ((StatusException) cause).getStatus();
+        } else {
+            return false;
+        }
+    }
+
     public static <T, ReqT, RespT, ServiceT> void processRpc(
         ServiceT service,
         CompletableFuture<T> result,
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetriesTest.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetriesTest.java
new file mode 100644
index 0000000..fa16002
--- /dev/null
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetriesTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.clients.impl.internal;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
+import org.apache.bookkeeper.stream.proto.NamespaceProperties;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
+import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link RootRangeClientImplWithRetries}.
+ */
+public class RootRangeClientImplWithRetriesTest {
+
+    private static final int NUM_RETRIES = 3;
+
+    private static final String NS_NAME = "test-namespace";
+    private static final NamespaceConfiguration NS_CONF = NamespaceConfiguration.newBuilder().build();
+    private static final NamespaceProperties NS_PROPS = NamespaceProperties.newBuilder().build();
+    private static final String STREAM_NAME = "test-stream";
+    private static final StreamConfiguration STREAM_CONF = StreamConfiguration.newBuilder().build();
+    private static final StreamProperties STREAM_PROPS = StreamProperties.newBuilder().build();
+
+    private AtomicInteger callCounter;
+    private RootRangeClient client;
+    private OrderedScheduler scheduler;
+    private RootRangeClientImplWithRetries clientWithRetries;
+
+    @Before
+    public void setup() {
+        this.callCounter = new AtomicInteger(NUM_RETRIES);
+        this.client = mock(RootRangeClient.class);
+        this.scheduler = OrderedScheduler.newSchedulerBuilder()
+            .name("test-scheduler")
+            .numThreads(1)
+            .build();
+        this.clientWithRetries = new RootRangeClientImplWithRetries(
+            client,
+            Backoff.Constant.of(10, NUM_RETRIES),
+            scheduler);
+    }
+
+    @Test
+    public void testCreateNamespace() throws Exception {
+        when(client.createNamespace(anyString(), any(NamespaceConfiguration.class)))
+            .thenAnswer(invocationOnMock -> {
+                if (callCounter.decrementAndGet() > 0) {
+                    return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+                } else {
+                    return FutureUtils.value(NS_PROPS);
+                }
+            });
+
+        assertSame(NS_PROPS, FutureUtils.result(clientWithRetries.createNamespace(NS_NAME, NS_CONF)));
+    }
+
+    @Test
+    public void testDeleteNamespace() throws Exception {
+        when(client.deleteNamespace(anyString()))
+            .thenAnswer(invocationOnMock -> {
+                if (callCounter.decrementAndGet() > 0) {
+                    return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+                } else {
+                    return FutureUtils.value(true);
+                }
+            });
+
+        assertTrue(FutureUtils.result(clientWithRetries.deleteNamespace(NS_NAME)));
+    }
+
+    @Test
+    public void testGetNamespace() throws Exception {
+        when(client.getNamespace(anyString()))
+            .thenAnswer(invocationOnMock -> {
+                if (callCounter.decrementAndGet() > 0) {
+                    return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+                } else {
+                    return FutureUtils.value(NS_PROPS);
+                }
+            });
+
+        assertSame(NS_PROPS, FutureUtils.result(clientWithRetries.getNamespace(NS_NAME)));
+    }
+
+    @Test
+    public void testCreateStream() throws Exception {
+        when(client.createStream(anyString(), anyString(), any(StreamConfiguration.class)))
+            .thenAnswer(invocationOnMock -> {
+                if (callCounter.decrementAndGet() > 0) {
+                    return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+                } else {
+                    return FutureUtils.value(STREAM_PROPS);
+                }
+            });
+
+        assertSame(STREAM_PROPS, FutureUtils.result(clientWithRetries.createStream(NS_NAME, STREAM_NAME, STREAM_CONF)));
+    }
+
+    @Test
+    public void testDeleteStream() throws Exception {
+        when(client.deleteStream(anyString(), anyString()))
+            .thenAnswer(invocationOnMock -> {
+                if (callCounter.decrementAndGet() > 0) {
+                    return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+                } else {
+                    return FutureUtils.value(true);
+                }
+            });
+
+        assertTrue(FutureUtils.result(clientWithRetries.deleteStream(NS_NAME, STREAM_NAME)));
+    }
+
+    @Test
+    public void testGetStream() throws Exception {
+        when(client.getStream(anyString(), anyString()))
+            .thenAnswer(invocationOnMock -> {
+                if (callCounter.decrementAndGet() > 0) {
+                    return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+                } else {
+                    return FutureUtils.value(STREAM_PROPS);
+                }
+            });
+
+        assertSame(STREAM_PROPS, FutureUtils.result(clientWithRetries.getStream(NS_NAME, STREAM_NAME)));
+    }
+
+    @Test
+    public void testGetStreamById() throws Exception {
+        when(client.getStream(anyLong()))
+            .thenAnswer(invocationOnMock -> {
+                if (callCounter.decrementAndGet() > 0) {
+                    return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+                } else {
+                    return FutureUtils.value(STREAM_PROPS);
+                }
+            });
+
+        assertSame(STREAM_PROPS, FutureUtils.result(clientWithRetries.getStream(1234L)));
+    }
+}
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessorTest.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessorTest.java
new file mode 100644
index 0000000..2c229cb
--- /dev/null
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessorTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.clients.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ListenableFutureRpcProcessor}.
+ */
+public class ListenableFutureRpcProcessorTest {
+
+    private ListenableFutureRpcProcessor<String, String, String> processor;
+    private StorageContainerChannel scChannel;
+    private ScheduledExecutorService executor;
+
+    @Before
+    public void setup() {
+        executor = Executors.newSingleThreadScheduledExecutor();
+        scChannel = mock(StorageContainerChannel.class);
+        processor = spy(new ListenableFutureRpcProcessor<String, String, String>(
+            scChannel, executor, ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY) {
+            @Override
+            protected String createRequest() {
+                return null;
+            }
+
+            @Override
+            protected ListenableFuture<String> sendRPC(StorageServerChannel rsChannel, String s) {
+                return null;
+            }
+
+            @Override
+            protected String processResponse(String response) throws Exception {
+                return null;
+            }
+        });
+    }
+
+    @Test
+    public void testFailToConnect() {
+        CompletableFuture<StorageServerChannel> serverFuture = new CompletableFuture<>();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
+
+        CompletableFuture<String> resultFuture = processor.process();
+        verify(scChannel, times(1)).getStorageContainerChannelFuture();
+
+        // inject channel failure
+        Exception testExc = new Exception("test-exception");
+        serverFuture.completeExceptionally(testExc);
+
+        try {
+            FutureUtils.result(resultFuture);
+            fail("Should fail the process if failed to connect to storage server");
+        } catch (Exception e) {
+            assertSame(testExc, e);
+        }
+    }
+
+    @Test
+    public void testProcessSuccessfully() throws Exception {
+        String request = "request";
+        String response = "response";
+        String result = "result";
+
+        StorageServerChannel serverChannel = mock(StorageServerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverFuture = new CompletableFuture<>();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
+
+        SettableFuture<String> rpcFuture = SettableFuture.create();
+
+        // mock the process method
+        when(processor.createRequest()).thenReturn(request);
+        when(processor.sendRPC(same(serverChannel), eq(request))).thenReturn(rpcFuture);
+        when(processor.processResponse(eq(response))).thenReturn(result);
+
+        CompletableFuture<String> resultFuture = processor.process();
+        verify(scChannel, times(1)).getStorageContainerChannelFuture();
+
+        // complete the server future to return a mock server channel
+        FutureUtils.complete(serverFuture, serverChannel);
+
+        // complete the rpc future to return the response
+        rpcFuture.set(response);
+
+        assertEquals(result, resultFuture.get());
+    }
+
+    @Test
+    public void testProcessResponseException() throws Exception {
+        String request = "request";
+        String response = "response";
+
+        StorageServerChannel serverChannel = mock(StorageServerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverFuture = new CompletableFuture<>();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
+
+        SettableFuture<String> rpcFuture = SettableFuture.create();
+
+        Exception testException = new Exception("test-exception");
+
+        // mock the process method
+        when(processor.createRequest()).thenReturn(request);
+        when(processor.sendRPC(same(serverChannel), eq(request))).thenReturn(rpcFuture);
+        when(processor.processResponse(eq(response))).thenThrow(testException);
+
+        CompletableFuture<String> resultFuture = processor.process();
+        verify(scChannel, times(1)).getStorageContainerChannelFuture();
+
+        // complete the server future to return a mock server channel
+        FutureUtils.complete(serverFuture, serverChannel);
+
+        // complete the rpc future to return the response
+        rpcFuture.set(response);
+
+        try {
+            FutureUtils.result(resultFuture);
+            fail("Should throw exception on processing result");
+        } catch (Exception e) {
+            assertSame(testException, e);
+        }
+    }
+
+    @Test
+    public void testProcessRpcException() throws Exception {
+        String request = "request";
+        String response = "response";
+        String result = "result";
+
+        StorageServerChannel serverChannel = mock(StorageServerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverFuture = new CompletableFuture<>();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
+
+        SettableFuture<String> rpcFuture = SettableFuture.create();
+
+        // mock the process method
+        when(processor.createRequest()).thenReturn(request);
+        when(processor.sendRPC(same(serverChannel), eq(request))).thenReturn(rpcFuture);
+        when(processor.processResponse(eq(response))).thenReturn(result);
+
+        CompletableFuture<String> resultFuture = processor.process();
+        verify(scChannel, times(1)).getStorageContainerChannelFuture();
+
+        // complete the server future to return a mock server channel
+        FutureUtils.complete(serverFuture, serverChannel);
+
+        // complete the rpc future with `Status.INTERNAL`
+        rpcFuture.setException(new StatusRuntimeException(Status.INTERNAL));
+
+        try {
+            FutureUtils.result(resultFuture);
+            fail("Should throw fail immediately if rpc request failed");
+        } catch (Exception e) {
+            assertTrue(e instanceof StatusRuntimeException);
+            StatusRuntimeException sre = (StatusRuntimeException) e;
+            assertEquals(Status.INTERNAL, sre.getStatus());
+        }
+    }
+
+    @Test
+    public void testProcessRetryNotFoundRpcException() throws Exception {
+        String request = "request";
+        String response = "response";
+        String result = "result";
+
+        StorageServerChannel serverChannel = mock(StorageServerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverFuture = new CompletableFuture<>();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
+
+        AtomicInteger numRpcs = new AtomicInteger(0);
+
+        // mock the process method
+        when(processor.createRequest()).thenReturn(request);
+        when(processor.processResponse(eq(response))).thenReturn(result);
+        when(processor.sendRPC(same(serverChannel), eq(request))).thenAnswer(invocationOnMock -> {
+            SettableFuture<String> rpcFuture = SettableFuture.create();
+            if (numRpcs.getAndIncrement() > 2) {
+                rpcFuture.set(response);
+            } else {
+                rpcFuture.setException(new StatusRuntimeException(Status.NOT_FOUND));
+            }
+            return rpcFuture;
+        });
+
+        CompletableFuture<String> resultFuture = processor.process();
+
+        // complete the server future to return a mock server channel
+        FutureUtils.complete(serverFuture, serverChannel);
+
+        assertEquals(result, FutureUtils.result(resultFuture));
+        verify(scChannel, times(4)).getStorageContainerChannelFuture();
+    }
+}
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/RpcUtilsTest.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/RpcUtilsTest.java
new file mode 100644
index 0000000..d97d4e4
--- /dev/null
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/RpcUtilsTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.clients.utils;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import org.junit.Test;
+
+/**
+ * Unit test {@link RpcUtils}.
+ */
+public class RpcUtilsTest {
+
+    @Test
+    public void testIsContainerNotFound() {
+        StatusRuntimeException trueSRE = new StatusRuntimeException(Status.NOT_FOUND);
+        assertTrue(RpcUtils.isContainerNotFound(trueSRE));
+        StatusRuntimeException falseSRE = new StatusRuntimeException(Status.INTERNAL);
+        assertFalse(RpcUtils.isContainerNotFound(falseSRE));
+
+        StatusException trueSE = new StatusException(Status.NOT_FOUND);
+        assertTrue(RpcUtils.isContainerNotFound(trueSE));
+        StatusException falseSE = new StatusException(Status.INTERNAL);
+        assertFalse(RpcUtils.isContainerNotFound(falseSE));
+
+        Exception unknownException = new Exception("unknown");
+        assertFalse(RpcUtils.isContainerNotFound(unknownException));
+    }
+
+}
diff --git a/stream/clients/java/base/src/test/resources/log4j.properties b/stream/clients/java/base/src/test/resources/log4j.properties
new file mode 100644
index 0000000..9405038
--- /dev/null
+++ b/stream/clients/java/base/src/test/resources/log4j.properties
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# DisributedLog Logging Configuration
+#
+
+# Example with rolling log file
+log4j.rootLogger=INFO, CONSOLE
+
+#disable zookeeper logging
+log4j.logger.org.apache.zookeeper=OFF
+#Set the bookkeeper level to warning
+log4j.logger.org.apache.bookkeeper=INFO
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.Threshold=INFO
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+# Add ROLLINGFILE to rootLogger to get log file output
+#    Log DEBUG level and above messages to a log file
+#log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
+#log4j.appender.ROLLINGFILE.Threshold=INFO
+#log4j.appender.ROLLINGFILE.File=stream.log
+#log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+#log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd-HH-mm
+#log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.Threshold=TRACE
+log4j.appender.R.File=target/error.log
+log4j.appender.R.MaxFileSize=200MB
+log4j.appender.R.MaxBackupIndex=7
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableImpl.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableImpl.java
index 4cb7dc1..1bda971 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableImpl.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableImpl.java
@@ -49,6 +49,7 @@ import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManage
 import org.apache.bookkeeper.clients.impl.routing.RangeRouter;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.router.ByteBufHashRouter;
+import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
 
 /**
@@ -157,7 +158,8 @@ public class PByteBufTableImpl implements PTable<ByteBuf, ByteBuf> {
     public PByteBufTableImpl(String streamName,
                              StreamProperties props,
                              StorageServerClientManager clientManager,
-                             ScheduledExecutorService executor) {
+                             ScheduledExecutorService executor,
+                             Backoff.Policy backoffPolicy) {
         this(
             streamName,
             props,
@@ -171,7 +173,8 @@ public class PByteBufTableImpl implements PTable<ByteBuf, ByteBuf> {
                     executorService,
                     opFactory,
                     resultFactory,
-                    kvFactory),
+                    kvFactory,
+                    backoffPolicy),
             Optional.empty());
     }
 
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
index 1eb6ec8..03c64f7 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.api.kv.result.PutResult;
 import org.apache.bookkeeper.api.kv.result.RangeResult;
 import org.apache.bookkeeper.api.kv.result.TxnResult;
 import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.stream.proto.RangeProperties;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
@@ -58,6 +59,7 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
     private final OpFactory<ByteBuf, ByteBuf> opFactory;
     private final ResultFactory<ByteBuf, ByteBuf> resultFactory;
     private final KeyValueFactory<ByteBuf, ByteBuf> kvFactory;
+    private final Backoff.Policy backoffPolicy;
 
     PByteBufTableRangeImpl(long streamId,
                            RangeProperties rangeProps,
@@ -65,7 +67,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
                            ScheduledExecutorService executor,
                            OpFactory<ByteBuf, ByteBuf> opFactory,
                            ResultFactory<ByteBuf, ByteBuf> resultFactory,
-                           KeyValueFactory<ByteBuf, ByteBuf> kvFactory) {
+                           KeyValueFactory<ByteBuf, ByteBuf> kvFactory,
+                           Backoff.Policy backoffPolicy) {
         this.streamId = streamId;
         this.rangeProps = rangeProps;
         this.scChannel = scChannel;
@@ -73,6 +76,7 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
         this.opFactory = opFactory;
         this.resultFactory = resultFactory;
         this.kvFactory = kvFactory;
+        this.backoffPolicy = backoffPolicy;
     }
 
     private RoutingHeader.Builder newRoutingHeader(ByteBuf pKey) {
@@ -97,7 +101,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
                     .setHeader(newRoutingHeader(pKey))),
             response -> KvUtils.newRangeResult(response.getKvRangeResp(), resultFactory, kvFactory),
             scChannel,
-            executor
+            executor,
+            backoffPolicy
         ).process().whenComplete((value, cause) -> {
             pKey.release();
             lKey.release();
@@ -122,7 +127,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
                     .setHeader(newRoutingHeader(pKey))),
             response -> KvUtils.newPutResult(response.getKvPutResp(), resultFactory, kvFactory),
             scChannel,
-            executor
+            executor,
+            backoffPolicy
         ).process().whenComplete((ignored, cause) -> {
             pKey.release();
             lKey.release();
@@ -146,7 +152,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
                     .setHeader(newRoutingHeader(pKey))),
             response -> KvUtils.newDeleteResult(response.getKvDeleteResp(), resultFactory, kvFactory),
             scChannel,
-            executor
+            executor,
+            backoffPolicy
         ).process().whenComplete((ignored, cause) -> {
             pKey.release();
             lKey.release();
@@ -170,7 +177,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
                     .setHeader(newRoutingHeader(pKey))),
             response -> KvUtils.newIncrementResult(response.getKvIncrResp(), resultFactory, kvFactory),
             scChannel,
-            executor
+            executor,
+            backoffPolicy
         ).process().whenComplete((ignored, cause) -> {
             pKey.release();
             lKey.release();
@@ -246,7 +254,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
                     txnBuilder.setHeader(newRoutingHeader(pKey))),
                 response -> KvUtils.newKvTxnResult(response.getKvTxnResp(), resultFactory, kvFactory),
                 scChannel,
-                executor
+                executor,
+                backoffPolicy
             ).process().whenComplete((ignored, cause) -> {
                 pKey.release();
                 for (AutoCloseable resource : resourcesToRelease) {
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java
index 1b9d4cb..f5e0bc5 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java
@@ -18,10 +18,12 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Function;
+import java.util.stream.Stream;
 import org.apache.bookkeeper.clients.exceptions.InternalServerException;
 import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
 import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
 import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
@@ -36,8 +38,9 @@ public class TableRequestProcessor<RespT>
         StorageContainerRequest request,
         Function<StorageContainerResponse, T> responseFunc,
         StorageContainerChannel channel,
-        ScheduledExecutorService executor) {
-        return new TableRequestProcessor<>(request, responseFunc, channel, executor);
+        ScheduledExecutorService executor,
+        Policy backoffPolicy) {
+        return new TableRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
     }
 
     private final StorageContainerRequest request;
@@ -46,8 +49,9 @@ public class TableRequestProcessor<RespT>
     private TableRequestProcessor(StorageContainerRequest request,
                                   Function<StorageContainerResponse, RespT> respFunc,
                                   StorageContainerChannel channel,
-                                  ScheduledExecutorService executor) {
-        super(channel, executor);
+                                  ScheduledExecutorService executor,
+                                  Policy backoffPolicy) {
+        super(channel, executor, backoffPolicy);
         this.request = request;
         this.responseFunc = respFunc;
     }
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java
index fc93384..bec76e5 100644
--- a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java
@@ -38,6 +38,7 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
 import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
 import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
@@ -206,7 +207,8 @@ public class TableRequestProcessorTest extends GrpcClientTestBase {
             request,
             resp -> "test",
             scChannel,
-            scheduler);
+            scheduler,
+            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
         assertEquals("test", FutureUtils.result(processor.process()));
         assertSame(request, receivedRequest.get());
         assertEquals(type, receivedRequestType.get());
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestPByteBufTableImpl.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestPByteBufTableImpl.java
index c2adc66..d25c3c9 100644
--- a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestPByteBufTableImpl.java
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestPByteBufTableImpl.java
@@ -49,6 +49,7 @@ import org.apache.bookkeeper.clients.impl.internal.api.HashStreamRanges;
 import org.apache.bookkeeper.clients.impl.internal.api.MetaRangeClient;
 import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.clients.impl.routing.RangeRouter;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.router.HashRouter;
 import org.apache.bookkeeper.common.util.Bytes;
@@ -135,7 +136,8 @@ public class TestPByteBufTableImpl {
             runtime.getMethodName(),
             streamProps,
             mockClientManager,
-            scheduler.chooseThread(1));
+            scheduler.chooseThread(1),
+            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
         try {
             FutureUtils.result(table.initialize());
             fail("Should fail initializing the table with exception " + cause);

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.