You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 20:39:58 UTC

[bookkeeper] branch branch-4.7 updated (8100719 -> 6c881f7)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a change to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git.


    from 8100719  Use unpooled buffers in BufferedChannel
     new 7f0d8ad  [TABLE SERVICE] Dlog based checkpoint store
     new 71e0604  [TABLE SERVICE] apply backoff policy to rpc requests if storage container is not found
     new c495cf0  ISSUE #1384: Bump checkstyle version
     new 6c881f7  [TABLE SERVICE] [DLOG] Fix `ConcurrentModificationException` on accessing component configuration

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../common/conf/ComponentConfiguration.java        |  41 ++--
 .../common/conf/ConcurrentConfiguration.java       |  28 +--
 .../org/apache/bookkeeper/common/util/Backoff.java |  12 +-
 pom.xml                                            |   2 +-
 .../bookkeeper/clients/StorageClientImpl.java      |   3 +-
 .../clients/config/StorageClientSettings.java      |  12 +
 .../impl/container/StorageContainerChannel.java    |  18 +-
 .../clients/impl/internal/LocationClientImpl.java  |   1 -
 .../clients/impl/internal/MetaRangeClientImpl.java |  15 +-
 .../clients/impl/internal/RootRangeClientImpl.java |  11 +-
 .../internal/RootRangeClientImplWithRetries.java   | 132 +++++++++++
 .../internal/StorageServerClientManagerImpl.java   |  10 +-
 .../internal/mr/MetaRangeRequestProcessor.java     |  11 +-
 .../bookkeeper/clients/utils/ClientConstants.java  |  16 ++
 .../utils/ListenableFutureRpcProcessor.java        |  44 ++--
 .../apache/bookkeeper/clients/utils/RpcUtils.java  |  13 ++
 .../RootRangeClientImplWithRetriesTest.java        | 173 ++++++++++++++
 .../utils/ListenableFutureRpcProcessorTest.java    | 236 +++++++++++++++++++
 .../bookkeeper/clients/utils/RpcUtilsTest.java     |  50 ++++
 .../src/test/resources/log4j.properties            |   0
 .../clients/impl/kv/PByteBufTableImpl.java         |   7 +-
 .../clients/impl/kv/PByteBufTableRangeImpl.java    |  21 +-
 .../clients/impl/kv/TableRequestProcessor.java     |  12 +-
 .../clients/impl/kv/TableRequestProcessorTest.java |   4 +-
 .../clients/impl/kv/TestPByteBufTableImpl.java     |   4 +-
 .../common/config/ConcurrentBaseConfiguration.java |  45 +---
 .../DistributedLogConfiguration.java               |  14 +-
 .../impl/metadata/ZKLogStreamMetadataStore.java    |   2 +
 .../org/apache/distributedlog/fs/DLFileSystem.java |  15 +-
 .../bookkeeper/stream/cluster/StreamCluster.java   |  24 +-
 .../bookkeeper/stream/server/StorageServer.java    |   2 +
 .../stream/server/conf/DLConfiguration.java        |   4 +-
 .../server/conf/StorageServerConfiguration.java    |   2 +-
 .../server/service/DLNamespaceProviderService.java |   6 +-
 .../rocksdb/checkpoint/dlog/DLCheckpointStore.java | 154 +++++++++++++
 .../rocksdb/checkpoint/dlog}/DLInputStream.java    |  16 +-
 .../rocksdb/checkpoint/dlog}/DLOutputStream.java   |   2 +-
 .../rocksdb/checkpoint/dlog}/package-info.java     |   4 +-
 .../checkpoint/dlog/DLCheckpointStoreTest.java     | 256 +++++++++++++++++++++
 .../stream/storage/conf/StorageConfiguration.java  |   2 +-
 .../storage/impl/store/MVCCStoreFactoryImpl.java   |  17 +-
 .../impl/store/MVCCStoreFactoryImplTest.java       |   1 +
 42 files changed, 1277 insertions(+), 165 deletions(-)
 copy stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java => bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ConcurrentConfiguration.java (66%)
 create mode 100644 stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java
 create mode 100644 stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetriesTest.java
 create mode 100644 stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessorTest.java
 create mode 100644 stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/RpcUtilsTest.java
 copy stream/clients/java/{kv => base}/src/test/resources/log4j.properties (100%)
 create mode 100644 stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
 copy stream/{distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs => statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog}/DLInputStream.java (95%)
 copy stream/{distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs => statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog}/DLOutputStream.java (98%)
 copy {bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/exceptions => stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog}/package-info.java (88%)
 create mode 100644 stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStoreTest.java

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.

[bookkeeper] 04/04: [TABLE SERVICE] [DLOG] Fix `ConcurrentModificationException` on accessing component configuration

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 6c881f71554235182c615f324ecb4ed8a592ab38
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu May 3 19:08:35 2018 -0700

    [TABLE SERVICE] [DLOG] Fix `ConcurrentModificationException` on accessing component configuration
    
    Descriptions of the changes in this PR:
    
    *Motiviation*
    
    Currently when running table service in standalone mode, it occasionally throws `ConcurrentModificationException`. Because
    multiple components are sharing same underlying composite configuration. It causes the contention between copying keys and retrieving keys.
    
    *Solution*
    
    Change the component configuration to use a concurrent hashmap based configuration. Also changed the `loadConf` behavior to copy the keys / sub-keys
    into the component configuration. This prevents components sharing same underlying composite configuration.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
    
    This closes #1378 from sijie/fix_conf
---
 .../common/conf/ComponentConfiguration.java        | 41 +++++++++++---------
 .../common/conf/ConcurrentConfiguration.java       | 28 +++++++-------
 .../common/config/ConcurrentBaseConfiguration.java | 45 +---------------------
 .../DistributedLogConfiguration.java               | 14 +++++--
 .../bookkeeper/stream/cluster/StreamCluster.java   | 24 ++++++------
 .../stream/server/conf/DLConfiguration.java        |  4 +-
 .../server/conf/StorageServerConfiguration.java    |  2 +-
 .../server/service/DLNamespaceProviderService.java |  6 ++-
 .../stream/storage/conf/StorageConfiguration.java  |  2 +-
 9 files changed, 70 insertions(+), 96 deletions(-)

diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java
index 3ebb4a4..7d22443 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java
@@ -37,25 +37,26 @@ public abstract class ComponentConfiguration implements Configuration {
     protected static final String DELIMITER = ".";
 
     private final String componentPrefix;
-    private final CompositeConfiguration conf;
+    private final CompositeConfiguration underlyingConf;
+    private final Configuration conf;
 
-    protected ComponentConfiguration(CompositeConfiguration conf,
+    protected ComponentConfiguration(CompositeConfiguration underlyingConf,
                                      String componentPrefix) {
         super();
-        this.conf = conf;
+        this.underlyingConf = underlyingConf;
+        this.conf = new ConcurrentConfiguration();
         this.componentPrefix = componentPrefix;
-    }
 
-    protected String getKeyName(String name) {
-        return this.componentPrefix + name;
+        // load the component keys
+        loadConf(underlyingConf);
     }
 
-    public String getComponentPrefix() {
-        return componentPrefix;
+    protected String getKeyName(String name) {
+        return name;
     }
 
     public CompositeConfiguration getUnderlyingConf() {
-        return conf;
+        return underlyingConf;
     }
 
     /**
@@ -66,7 +67,16 @@ public abstract class ComponentConfiguration implements Configuration {
      */
     public void loadConf(URL confURL) throws ConfigurationException {
         Configuration loadedConf = new PropertiesConfiguration(confURL);
-        conf.addConfiguration(loadedConf);
+        loadConf(loadedConf);
+    }
+
+    protected void loadConf(Configuration loadedConf) {
+        loadedConf.getKeys().forEachRemaining(fullKey -> {
+            if (fullKey.startsWith(componentPrefix)) {
+                String componentKey = fullKey.substring(componentPrefix.length());
+                setProperty(componentKey, loadedConf.getProperty(fullKey));
+            }
+        });
     }
 
     public void validate() throws ConfigurationException {
@@ -80,7 +90,7 @@ public abstract class ComponentConfiguration implements Configuration {
 
     @Override
     public boolean isEmpty() {
-        return conf.subset(componentPrefix).isEmpty();
+        return conf.isEmpty();
     }
 
     @Override
@@ -105,12 +115,7 @@ public abstract class ComponentConfiguration implements Configuration {
 
     @Override
     public void clear() {
-        Iterator<String> keys = conf.getKeys();
-        keys.forEachRemaining(s -> {
-            if (s.startsWith(componentPrefix)) {
-                conf.clearProperty(s);
-            }
-        });
+        conf.clear();
     }
 
     @Override
@@ -125,7 +130,7 @@ public abstract class ComponentConfiguration implements Configuration {
 
     @Override
     public Iterator<String> getKeys() {
-        return conf.getKeys(componentPrefix);
+        return conf.getKeys();
     }
 
     @Override
diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ConcurrentConfiguration.java
similarity index 66%
copy from stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ConcurrentConfiguration.java
index b390431..72d1504 100644
--- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ConcurrentConfiguration.java
@@ -7,36 +7,38 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.distributedlog.common.config;
+package org.apache.bookkeeper.common.conf;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.commons.configuration.AbstractConfiguration;
 
 /**
  * Configuration view built on concurrent hash map for fast thread-safe access.
- * Notes:
- * 1. Multi-property list aggregation will not work in this class. I.e. commons config
+ *
+ * <p>Notes: Multi-property list aggregation will not work in this class. I.e. commons config
  * normally combines all properties with the same key into one list property automatically.
  * This class simply overwrites any existing mapping.
  */
 @SuppressWarnings("unchecked")
-public class ConcurrentBaseConfiguration extends AbstractConfiguration {
+public class ConcurrentConfiguration extends AbstractConfiguration {
 
-    private final ConcurrentHashMap<String, Object> map;
+    private final ConcurrentMap<String, Object> map;
 
-    public ConcurrentBaseConfiguration() {
-        this.map = new ConcurrentHashMap<String, Object>();
+    public ConcurrentConfiguration() {
+        this.map = new ConcurrentHashMap<>();
     }
 
     @Override
diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
index b390431..e617db1 100644
--- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
+++ b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
@@ -17,11 +17,7 @@
  */
 package org.apache.distributedlog.common.config;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.configuration.AbstractConfiguration;
+import org.apache.bookkeeper.common.conf.ConcurrentConfiguration;
 
 /**
  * Configuration view built on concurrent hash map for fast thread-safe access.
@@ -31,42 +27,5 @@ import org.apache.commons.configuration.AbstractConfiguration;
  * This class simply overwrites any existing mapping.
  */
 @SuppressWarnings("unchecked")
-public class ConcurrentBaseConfiguration extends AbstractConfiguration {
-
-    private final ConcurrentHashMap<String, Object> map;
-
-    public ConcurrentBaseConfiguration() {
-        this.map = new ConcurrentHashMap<String, Object>();
-    }
-
-    @Override
-    protected void addPropertyDirect(String key, Object value) {
-        checkNotNull(value);
-        map.put(key, value);
-    }
-
-    @Override
-    public Object getProperty(String key) {
-        return map.get(key);
-    }
-
-    @Override
-    public Iterator getKeys() {
-        return map.keySet().iterator();
-    }
-
-    @Override
-    public boolean containsKey(String key) {
-        return map.containsKey(key);
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return map.isEmpty();
-    }
-
-    @Override
-    protected void clearPropertyDirect(String key) {
-        map.remove(key);
-    }
+public class ConcurrentBaseConfiguration extends ConcurrentConfiguration {
 }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
index 5b16fe0..4052f4d 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
@@ -517,16 +517,22 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
      * @param baseConf Other Configuration
      */
     public void loadConf(DistributedLogConfiguration baseConf) {
-        addConfiguration(baseConf);
+        for (Iterator<String> iter = baseConf.getKeys(); iter.hasNext(); ) {
+            String key = iter.next();
+            setProperty(key, baseConf.getProperty(key));
+        }
     }
 
     /**
      * Load configuration from other configuration object.
      *
-     * @param otherConf Other configuration object
+     * @param baseConf Other configuration object
      */
-    public void loadConf(Configuration otherConf) {
-        addConfiguration(otherConf);
+    public void loadConf(Configuration baseConf) {
+        for (Iterator<String> iter = baseConf.getKeys(); iter.hasNext(); ) {
+            String key = iter.next();
+            setProperty(key, baseConf.getProperty(key));
+        }
     }
 
     /**
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index 719a109..72a41e5 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -45,7 +45,6 @@ import org.apache.bookkeeper.stream.storage.api.controller.StorageController;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.impl.sc.helix.HelixStorageController;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.LocalDLMEmulator;
@@ -83,9 +82,18 @@ public class StreamCluster
     private static final String LEDGERS_AVAILABLE_PATH = "/stream/ledgers/available";
     private static final String NAMESPACE = "/stream/storage";
 
+    private static ServerConfiguration newServerConfiguration(String zkEnsemble) {
+        ServerConfiguration serverConf = new ServerConfiguration();
+        serverConf.setMetadataServiceUri("zk://" + zkEnsemble + LEDGERS_PATH);
+        serverConf.setAllowLoopback(true);
+        serverConf.setGcWaitTime(300000);
+        serverConf.setDiskUsageWarnThreshold(0.9999f);
+        serverConf.setDiskUsageThreshold(0.999999f);
+        return serverConf;
+    }
+
     private final StreamClusterSpec spec;
     private final List<Endpoint> rpcEndpoints;
-    private CompositeConfiguration baseConf;
     private String zkEnsemble;
     private int zkPort;
     private ZooKeeperServerShim zks;
@@ -140,15 +148,6 @@ public class StreamCluster
         log.info("Initializing the stream cluster.");
         ZooKeeper zkc = null;
         try (StorageController controller = new HelixStorageController(zkEnsemble)) {
-            // initialize the configuration
-            ServerConfiguration serverConf = new ServerConfiguration();
-            serverConf.setMetadataServiceUri("zk://" + zkEnsemble + LEDGERS_PATH);
-            serverConf.setAllowLoopback(true);
-            serverConf.setGcWaitTime(300000);
-            serverConf.setDiskUsageWarnThreshold(0.9999f);
-            serverConf.setDiskUsageThreshold(0.999999f);
-            this.baseConf = serverConf;
-
             zkc = ZooKeeperClient.newBuilder()
                 .connectString(zkEnsemble)
                 .sessionTimeoutMs(60000)
@@ -194,8 +193,7 @@ public class StreamCluster
             }
             LifecycleComponent server = null;
             try {
-                ServerConfiguration serverConf = new ServerConfiguration();
-                serverConf.loadConf(baseConf);
+                ServerConfiguration serverConf = newServerConfiguration(zkEnsemble);
                 serverConf.setBookiePort(bookiePort);
                 File bkDir = new File(spec.storageRootDir(), "bookie_" + bookiePort);
                 serverConf.setJournalDirName(bkDir.getPath());
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java
index db87057..b3cd91d 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java
@@ -18,11 +18,11 @@ import org.apache.bookkeeper.common.conf.ComponentConfiguration;
 import org.apache.commons.configuration.CompositeConfiguration;
 
 /**
- * A configuration delegates distributedlog configuration {@link org.apache.bookkeeper.DistributedLogConfiguration}.
+ * A configuration delegates distributedlog configuration {@link org.apache.distributedlog.DistributedLogConfiguration}.
  */
 public class DLConfiguration extends ComponentConfiguration {
 
-    public static final String COMPONENT_PREFIX = "dl" + DELIMITER;
+    public static final String COMPONENT_PREFIX = "dlog" + DELIMITER;
 
     public static DLConfiguration of(CompositeConfiguration conf) {
         return new DLConfiguration(conf);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
index d87854a..a0ef963 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
@@ -22,7 +22,7 @@ import org.apache.commons.configuration.CompositeConfiguration;
  */
 public class StorageServerConfiguration extends ComponentConfiguration {
 
-    private static final String COMPONENT_PREFIX = "rangeserver" + DELIMITER;
+    private static final String COMPONENT_PREFIX = "storageserver" + DELIMITER;
 
     public static StorageServerConfiguration of(CompositeConfiguration conf) {
         return new StorageServerConfiguration(conf);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
index d65c73e..04d030d 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.stream.server.conf.DLConfiguration;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.impl.metadata.BKDLConfig;
 import org.apache.distributedlog.metadata.DLMetadata;
@@ -68,6 +69,7 @@ public class DLNamespaceProviderService
 
     private final ServerConfiguration bkServerConf;
     private final DistributedLogConfiguration dlConf;
+    private final DynamicDistributedLogConfiguration dlDynConf;
     @Getter
     private final URI dlogUri;
     private Namespace namespace;
@@ -81,7 +83,7 @@ public class DLNamespaceProviderService
             ZKMetadataDriverBase.resolveZkServers(bkServerConf)));
         this.bkServerConf = bkServerConf;
         this.dlConf = new DistributedLogConfiguration();
-        ConfUtils.loadConfiguration(this.dlConf, conf, conf.getComponentPrefix());
+        this.dlConf.loadConf(conf);
         // disable write lock
         this.dlConf.setWriteLockEnabled(false);
         // setting the flush policy
@@ -93,6 +95,7 @@ public class DLNamespaceProviderService
         // rolling log segment concurrency is only 1
         this.dlConf.setLogSegmentRollingConcurrency(1);
         this.dlConf.setMaxLogSegmentBytes(256 * 1024 * 1024); // 256 MB
+        this.dlDynConf = ConfUtils.getConstDynConf(dlConf);
     }
 
     @Override
@@ -109,6 +112,7 @@ public class DLNamespaceProviderService
                 .statsLogger(getStatsLogger())
                 .clientId("storage-server")
                 .conf(dlConf)
+                .dynConf(dlDynConf)
                 .uri(uri)
                 .build();
         } catch (Throwable e) {
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
index 32208d0..b0f8909 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
@@ -24,7 +24,7 @@ public class StorageConfiguration extends ComponentConfiguration {
 
     private static final String COMPONENT_PREFIX = "storage" + DELIMITER;
 
-    private static final String RANGE_STORE_DIRS = "range_store_dirs";
+    private static final String RANGE_STORE_DIRS = "range.store.dirs";
 
     private static final String SERVE_READONLY_TABLES = "serve.readonly.tables";
 

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.

[bookkeeper] 02/04: [TABLE SERVICE] apply backoff policy to rpc requests if storage container is not found

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 71e0604ed60d6e1cd14cef41b7f041d57422dbfc
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed May 2 21:19:42 2018 -0700

    [TABLE SERVICE] apply backoff policy to rpc requests if storage container is not found
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    A storage container can move between servers due to failures, or it can take time to start on a server. During that short period of time, it is "unavailable" and clients will receive `Status.NOT_FOUND` from grpc channels. The client should retry on this error and attempt to re-locate the storage container again.
    
    *Modification*
    
    - add backoff policy in client settings
    - apply the backoff policy at storage container channel. if it receives `NOT_FOUND` grpc exception, it will reset the server channel, so subsequent requests will re-attempt to relocate the storage container.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1379 from sijie/rpc_backoffs
---
 .../org/apache/bookkeeper/common/util/Backoff.java |  12 +-
 .../bookkeeper/clients/StorageClientImpl.java      |   3 +-
 .../clients/config/StorageClientSettings.java      |  12 ++
 .../impl/container/StorageContainerChannel.java    |  18 +-
 .../clients/impl/internal/LocationClientImpl.java  |   1 -
 .../clients/impl/internal/MetaRangeClientImpl.java |  15 +-
 .../clients/impl/internal/RootRangeClientImpl.java |  11 +-
 .../internal/RootRangeClientImplWithRetries.java   | 132 ++++++++++++
 .../internal/StorageServerClientManagerImpl.java   |  10 +-
 .../internal/mr/MetaRangeRequestProcessor.java     |  11 +-
 .../bookkeeper/clients/utils/ClientConstants.java  |  16 ++
 .../utils/ListenableFutureRpcProcessor.java        |  44 ++--
 .../apache/bookkeeper/clients/utils/RpcUtils.java  |  13 ++
 .../RootRangeClientImplWithRetriesTest.java        | 173 +++++++++++++++
 .../utils/ListenableFutureRpcProcessorTest.java    | 236 +++++++++++++++++++++
 .../bookkeeper/clients/utils/RpcUtilsTest.java     |  50 +++++
 .../java/base/src/test/resources/log4j.properties  |  51 +++++
 .../clients/impl/kv/PByteBufTableImpl.java         |   7 +-
 .../clients/impl/kv/PByteBufTableRangeImpl.java    |  21 +-
 .../clients/impl/kv/TableRequestProcessor.java     |  12 +-
 .../clients/impl/kv/TableRequestProcessorTest.java |   4 +-
 .../clients/impl/kv/TestPByteBufTableImpl.java     |   4 +-
 22 files changed, 811 insertions(+), 45 deletions(-)

diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java
index 085e4cd..fdf8055 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 import lombok.Data;
 import lombok.ToString;
+import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
 
 /**
  * Implements various backoff strategies.
@@ -34,6 +35,12 @@ import lombok.ToString;
  */
 public class Backoff {
 
+    public static final Policy DEFAULT = Jitter.of(
+        Type.EXPONENTIAL,
+        200,
+        2000,
+        3);
+
     private static final int MaxBitShift = 62;
 
     /**
@@ -95,7 +102,10 @@ public class Backoff {
     @ToString
     public static class Jitter implements Policy {
 
-        enum Type {
+        /**
+         * Jitter type.
+         */
+        public enum Type {
             DECORRELATED,
             EQUAL,
             EXPONENTIAL
diff --git a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
index 0cde4e2..aff322c 100644
--- a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
+++ b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
@@ -98,7 +98,8 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable implements StorageCli
                     streamName,
                     props,
                     serverManager,
-                    scheduler.chooseThread(props.getStreamId())
+                    scheduler.chooseThread(props.getStreamId()),
+                    settings.backoffPolicy()
                 ).initialize();
             }),
             future
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
index 825e1d3..20b5821 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
@@ -24,6 +24,8 @@ import io.grpc.ManagedChannelBuilder;
 import io.grpc.NameResolver;
 import java.util.List;
 import java.util.Optional;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.inferred.freebuilder.FreeBuilder;
 
@@ -80,6 +82,15 @@ public interface StorageClientSettings {
     Optional<String> clientName();
 
     /**
+     * Configure a backoff policy for the client.
+     *
+     * <p>There are a few default backoff policies defined in {@link org.apache.bookkeeper.common.util.Backoff}.
+     *
+     * @return backoff policy provider
+     */
+    Backoff.Policy backoffPolicy();
+
+    /**
      * Builder of {@link StorageClientSettings} instances.
      */
     class Builder extends StorageClientSettings_Builder {
@@ -87,6 +98,7 @@ public interface StorageClientSettings {
         Builder() {
             numWorkerThreads(Runtime.getRuntime().availableProcessors());
             usePlaintext(true);
+            backoffPolicy(ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
         }
 
         @Override
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
index 4006776..8635e0f 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
@@ -78,11 +78,25 @@ public class StorageContainerChannel {
         return rsChannelFuture;
     }
 
-    @VisibleForTesting
-    synchronized void resetStorageServerChannelFuture() {
+    public synchronized void resetStorageServerChannelFuture() {
         rsChannelFuture = null;
     }
 
+    public synchronized boolean resetStorageServerChannelFuture(CompletableFuture<StorageServerChannel> oldFuture) {
+        if (oldFuture != null) {
+            // we only reset the channel that we expect to reset
+            if (rsChannelFuture == oldFuture) {
+                rsChannelFuture = null;
+                return true;
+            } else {
+                return false;
+            }
+        } else {
+            rsChannelFuture = null;
+            return true;
+        }
+    }
+
     @VisibleForTesting
     public synchronized void setStorageServerChannelFuture(CompletableFuture<StorageServerChannel> rsChannelFuture) {
         this.rsChannelFuture = rsChannelFuture;
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
index fa02004..95cae9a 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
@@ -105,7 +105,6 @@ public class LocationClientImpl implements LocationClient {
             }
             switch (status.getCode()) {
                 case INVALID_ARGUMENT:
-                case NOT_FOUND:
                 case ALREADY_EXISTS:
                 case PERMISSION_DENIED:
                 case UNAUTHENTICATED:
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
index c84caeb..10481f1 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
@@ -29,6 +29,8 @@ import org.apache.bookkeeper.clients.impl.container.StorageContainerChannelManag
 import org.apache.bookkeeper.clients.impl.internal.api.HashStreamRanges;
 import org.apache.bookkeeper.clients.impl.internal.api.MetaRangeClient;
 import org.apache.bookkeeper.clients.impl.internal.mr.MetaRangeRequestProcessor;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
 
@@ -41,13 +43,23 @@ class MetaRangeClientImpl implements MetaRangeClient {
     private final StreamProperties streamProps;
     private final ScheduledExecutorService executor;
     private final StorageContainerChannel scClient;
+    private final Backoff.Policy backoffPolicy;
 
     MetaRangeClientImpl(StreamProperties streamProps,
                         OrderedScheduler scheduler,
                         StorageContainerChannelManager channelManager) {
+        this(streamProps, scheduler, channelManager, ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+
+    }
+
+    MetaRangeClientImpl(StreamProperties streamProps,
+                        OrderedScheduler scheduler,
+                        StorageContainerChannelManager channelManager,
+                        Backoff.Policy backoffPolicy) {
         this.streamProps = streamProps;
         this.executor = scheduler.chooseThread(streamProps.getStreamId());
         this.scClient = channelManager.getOrCreate(streamProps.getStorageContainerId());
+        this.backoffPolicy = backoffPolicy;
     }
 
     @Override
@@ -71,7 +83,8 @@ class MetaRangeClientImpl implements MetaRangeClient {
                 streamProps),
             (response) -> createActiveRanges(response.getGetActiveRangesResp()),
             scClient,
-            executor
+            executor,
+            backoffPolicy
         ).process();
     }
 
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImpl.java
index 51ea9e3..72bb0e1 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImpl.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.clients.impl.internal;
 
 import static org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils.createRootRangeException;
+import static org.apache.bookkeeper.clients.utils.RpcUtils.isContainerNotFound;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
 import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createCreateNamespaceRequest;
 import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createCreateStreamRequest;
@@ -79,7 +80,15 @@ class RootRangeClientImpl implements RootRangeClient {
         ProcessRequestFunc<ReqT, RespT, RootRangeServiceFutureStub> processRequestFunc,
         ProcessResponseFunc<RespT, T> processResponseFunc) {
 
-        CompletableFuture<T> result = FutureUtils.createFuture();
+        CompletableFuture<T> result = FutureUtils.<T>createFuture()
+            .whenComplete((v, cause) -> {
+                if (null != cause && isContainerNotFound(cause)) {
+                    // if the rpc fails with `NOT_FOUND`, it means the storage container is not owned by any servers
+                    // yet. in this case, reset the storage server channel, this allows subsequent retries will be
+                    // forced to re-locate the containers.
+                    scClient.resetStorageServerChannelFuture();
+                }
+            });
         scClient.getStorageContainerChannelFuture().whenComplete((rsChannel, cause) -> {
             if (null != cause) {
                 handleGetRootRangeServiceFailure(result, cause);
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java
new file mode 100644
index 0000000..d83d5a7
--- /dev/null
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.clients.impl.internal;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient;
+import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.Retries;
+import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
+import org.apache.bookkeeper.stream.proto.NamespaceProperties;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
+import org.apache.bookkeeper.stream.proto.StreamProperties;
+
+/**
+ * A root range client wrapper with retries.
+ */
+class RootRangeClientImplWithRetries implements RootRangeClient {
+
+    @VisibleForTesting
+    static final Predicate<Throwable> ROOT_RANGE_CLIENT_RETRY_PREDICATE =
+        cause -> shouldRetryOnException(cause);
+
+    private static boolean shouldRetryOnException(Throwable cause) {
+        if (cause instanceof StatusRuntimeException || cause instanceof StatusException) {
+            Status status;
+            if (cause instanceof StatusException) {
+                status = ((StatusException) cause).getStatus();
+            } else {
+                status = ((StatusRuntimeException) cause).getStatus();
+            }
+            switch (status.getCode()) {
+                case INVALID_ARGUMENT:
+                case ALREADY_EXISTS:
+                case PERMISSION_DENIED:
+                case UNAUTHENTICATED:
+                    return false;
+                default:
+                    return true;
+            }
+        } else if (cause instanceof RuntimeException) {
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+    private final RootRangeClient client;
+    private final Backoff.Policy backoffPolicy;
+    private final OrderedScheduler scheduler;
+
+    RootRangeClientImplWithRetries(RootRangeClient client,
+                                   Backoff.Policy backoffPolicy,
+                                   OrderedScheduler scheduler) {
+        this.client = client;
+        this.backoffPolicy = backoffPolicy;
+        this.scheduler = scheduler;
+    }
+
+    private <T> CompletableFuture<T> runRpcWithRetries(
+            Supplier<CompletableFuture<T>> futureSupplier) {
+        return Retries.run(
+            backoffPolicy.toBackoffs(),
+            ROOT_RANGE_CLIENT_RETRY_PREDICATE,
+            futureSupplier,
+            scheduler);
+    }
+
+    @Override
+    public CompletableFuture<NamespaceProperties> createNamespace(String namespace,
+                                                                  NamespaceConfiguration nsConf) {
+        return runRpcWithRetries(() -> client.createNamespace(namespace, nsConf));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> deleteNamespace(String namespace) {
+        return runRpcWithRetries(() -> client.deleteNamespace(namespace));
+    }
+
+    @Override
+    public CompletableFuture<NamespaceProperties> getNamespace(String namespace) {
+        return runRpcWithRetries(() -> client.getNamespace(namespace));
+    }
+
+    @Override
+    public CompletableFuture<StreamProperties> createStream(String nsName,
+                                                            String streamName,
+                                                            StreamConfiguration streamConf) {
+        return runRpcWithRetries(() ->
+            client.createStream(nsName, streamName, streamConf));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> deleteStream(String nsName, String streamName) {
+        return runRpcWithRetries(() ->
+            client.deleteStream(nsName, streamName));
+    }
+
+    @Override
+    public CompletableFuture<StreamProperties> getStream(String nsName, String streamName) {
+        return runRpcWithRetries(() ->
+            client.getStream(nsName, streamName));
+    }
+
+    @Override
+    public CompletableFuture<StreamProperties> getStream(long streamId) {
+        return runRpcWithRetries(() ->
+            client.getStream(streamId));
+    }
+}
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
index 0e64abe..d219b6a 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
@@ -79,9 +79,13 @@ public class StorageServerClientManagerImpl
             this.channelManager,
             this.locationClient,
             scheduler);
-        this.rootRangeClient = new RootRangeClientImpl(
-            scheduler,
-            scChannelManager);
+        this.rootRangeClient = new RootRangeClientImplWithRetries(
+            new RootRangeClientImpl(
+                scheduler,
+                scChannelManager),
+            settings.backoffPolicy(),
+            scheduler
+        );
         this.streamMetadataCache = new StreamMetadataCache(rootRangeClient);
         this.metaRangeClients = Maps.newConcurrentMap();
     }
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
index f7f2480..cb25e84 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
@@ -27,6 +27,7 @@ import java.util.function.Function;
 import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
 import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
 import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
@@ -41,8 +42,9 @@ public class MetaRangeRequestProcessor<RespT>
         StorageContainerRequest request,
         Function<StorageContainerResponse, T> responseFunc,
         StorageContainerChannel channel,
-        ScheduledExecutorService executor) {
-        return new MetaRangeRequestProcessor<>(request, responseFunc, channel, executor);
+        ScheduledExecutorService executor,
+        Backoff.Policy backoffPolicy) {
+        return new MetaRangeRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
     }
 
     private final StorageContainerRequest request;
@@ -51,8 +53,9 @@ public class MetaRangeRequestProcessor<RespT>
     private MetaRangeRequestProcessor(StorageContainerRequest request,
                                       Function<StorageContainerResponse, RespT> responseFunc,
                                       StorageContainerChannel channel,
-                                      ScheduledExecutorService executor) {
-        super(channel, executor);
+                                      ScheduledExecutorService executor,
+                                      Backoff.Policy backoffPolicy) {
+        super(channel, executor, backoffPolicy);
         this.request = request;
         this.responseFunc = responseFunc;
     }
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ClientConstants.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ClientConstants.java
index 1fe486d..42b8fec 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ClientConstants.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ClientConstants.java
@@ -18,6 +18,10 @@
 
 package org.apache.bookkeeper.clients.utils;
 
+import org.apache.bookkeeper.common.util.Backoff.Jitter;
+import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+
 /**
  * Client related constants.
  */
@@ -32,5 +36,17 @@ public final class ClientConstants {
     public static final int DEFAULT_BACKOFF_START_MS = 200;
     public static final int DEFAULT_BACKOFF_MAX_MS = 1000;
     public static final int DEFAULT_BACKOFF_MULTIPLIER = 2;
+    public static final int DEFAULT_BACKOFF_RETRIES = 3;
+
+    public static final Policy DEFAULT_BACKOFF_POLICY = Jitter.of(
+        Type.EXPONENTIAL,
+        DEFAULT_BACKOFF_START_MS,
+        DEFAULT_BACKOFF_MAX_MS,
+        DEFAULT_BACKOFF_RETRIES);
+
+    public static final Policy DEFAULT_INFINIT_BACKOFF_POLICY = Jitter.of(
+        Type.EXPONENTIAL,
+        DEFAULT_BACKOFF_START_MS,
+        DEFAULT_BACKOFF_MAX_MS);
 
 }
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessor.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessor.java
index af49a9b..28ad7ca 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessor.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessor.java
@@ -33,7 +33,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
 import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
 
 /**
  * A process for processing rpc request on storage container channel.
@@ -44,27 +44,22 @@ public abstract class ListenableFutureRpcProcessor<RequestT, ResponseT, ResultT>
     FutureCallback<ResponseT>,
     Runnable {
 
-    private static final long startBackoffMs = 200;
-    private static final long maxBackoffMs = 2000;
-    private static final int maxRetries = 3;
-
     private final StorageContainerChannel scChannel;
     private final Iterator<Long> backoffs;
     private final ScheduledExecutorService executor;
     private final CompletableFuture<ResultT> resultFuture;
 
+    private CompletableFuture<StorageServerChannel> serverChannelFuture = null;
+
     protected ListenableFutureRpcProcessor(StorageContainerChannel channel,
-                                           ScheduledExecutorService executor) {
+                                           ScheduledExecutorService executor,
+                                           Policy backoffPolicy) {
         this.scChannel = channel;
-        this.backoffs = configureBackoffs();
+        this.backoffs = backoffPolicy.toBackoffs().iterator();
         this.resultFuture = FutureUtils.createFuture();
         this.executor = executor;
     }
 
-    protected Iterator<Long> configureBackoffs() {
-        return Backoff.exponentialJittered(startBackoffMs, maxBackoffMs).limit(maxRetries).iterator();
-    }
-
     /**
      * Create the rpc request for the processor.
      *
@@ -88,7 +83,8 @@ public abstract class ListenableFutureRpcProcessor<RequestT, ResponseT, ResultT>
     protected abstract ResultT processResponse(ResponseT response) throws Exception;
 
     public CompletableFuture<ResultT> process() {
-        scChannel.getStorageContainerChannelFuture().whenCompleteAsync(this, executor);
+        serverChannelFuture = scChannel.getStorageContainerChannelFuture();
+        serverChannelFuture.whenCompleteAsync(this, executor);
         return resultFuture;
     }
 
@@ -106,7 +102,9 @@ public abstract class ListenableFutureRpcProcessor<RequestT, ResponseT, ResultT>
     @Override
     public void accept(StorageServerChannel storageServerChannel, Throwable cause) {
         if (null != cause) {
-            // failure to retrieve a channel to the server that hosts this storage container
+            // The `StorageContainerChannel` already retry on failures related to server channel,
+            // So we don't need to retry here if failed to retrieve a channel to the server
+            // that hosts this storage container.
             resultFuture.completeExceptionally(cause);
             return;
         }
@@ -141,14 +139,26 @@ public abstract class ListenableFutureRpcProcessor<RequestT, ResponseT, ResultT>
 
     @Override
     public void onFailure(Throwable t) {
-        boolean shouldRetry = false;
+        Status status = null;
         if (t instanceof StatusRuntimeException) {
-            shouldRetry = shouldRetryOn(((StatusRuntimeException) t).getStatus());
+            status = ((StatusRuntimeException) t).getStatus();
         } else if (t instanceof StatusException) {
-            shouldRetry = shouldRetryOn(((StatusException) t).getStatus());
+            status = ((StatusException) t).getStatus();
+        }
+
+        if (Status.NOT_FOUND == status) {
+            // `NOT_FOUND` means storage container is not found. that means:
+            //
+            // - the container is moved to a different server
+            // - the container is not assigned to any servers yet
+            // - the container is assigned, but it is still starting up and not ready for serving
+            //
+            // at either case, we need to reset the storage server channel, so next retry can attempt to re-locate
+            // the storage container again
+            scChannel.resetStorageServerChannelFuture(serverChannelFuture);
         }
 
-        if (shouldRetry && backoffs.hasNext()) {
+        if (shouldRetryOn(status) && backoffs.hasNext()) {
             long backoffMs = backoffs.next();
             executor.schedule(this, backoffMs, TimeUnit.MILLISECONDS);
         } else {
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/RpcUtils.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/RpcUtils.java
index d05bfff..46ca205 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/RpcUtils.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/RpcUtils.java
@@ -17,6 +17,9 @@ package org.apache.bookkeeper.clients.utils;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -48,6 +51,16 @@ public class RpcUtils {
         void process(RespT resp, CompletableFuture<T> resultFuture);
     }
 
+    public static boolean isContainerNotFound(Throwable cause) {
+        if (cause instanceof StatusRuntimeException) {
+            return Status.NOT_FOUND ==  ((StatusRuntimeException) cause).getStatus();
+        } else if (cause instanceof StatusException) {
+            return Status.NOT_FOUND ==  ((StatusException) cause).getStatus();
+        } else {
+            return false;
+        }
+    }
+
     public static <T, ReqT, RespT, ServiceT> void processRpc(
         ServiceT service,
         CompletableFuture<T> result,
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetriesTest.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetriesTest.java
new file mode 100644
index 0000000..fa16002
--- /dev/null
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetriesTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.clients.impl.internal;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
+import org.apache.bookkeeper.stream.proto.NamespaceProperties;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
+import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link RootRangeClientImplWithRetries}.
+ */
+public class RootRangeClientImplWithRetriesTest {
+
+    private static final int NUM_RETRIES = 3;
+
+    private static final String NS_NAME = "test-namespace";
+    private static final NamespaceConfiguration NS_CONF = NamespaceConfiguration.newBuilder().build();
+    private static final NamespaceProperties NS_PROPS = NamespaceProperties.newBuilder().build();
+    private static final String STREAM_NAME = "test-stream";
+    private static final StreamConfiguration STREAM_CONF = StreamConfiguration.newBuilder().build();
+    private static final StreamProperties STREAM_PROPS = StreamProperties.newBuilder().build();
+
+    private AtomicInteger callCounter;
+    private RootRangeClient client;
+    private OrderedScheduler scheduler;
+    private RootRangeClientImplWithRetries clientWithRetries;
+
+    @Before
+    public void setup() {
+        this.callCounter = new AtomicInteger(NUM_RETRIES);
+        this.client = mock(RootRangeClient.class);
+        this.scheduler = OrderedScheduler.newSchedulerBuilder()
+            .name("test-scheduler")
+            .numThreads(1)
+            .build();
+        this.clientWithRetries = new RootRangeClientImplWithRetries(
+            client,
+            Backoff.Constant.of(10, NUM_RETRIES),
+            scheduler);
+    }
+
+    @Test
+    public void testCreateNamespace() throws Exception {
+        when(client.createNamespace(anyString(), any(NamespaceConfiguration.class)))
+            .thenAnswer(invocationOnMock -> {
+                if (callCounter.decrementAndGet() > 0) {
+                    return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+                } else {
+                    return FutureUtils.value(NS_PROPS);
+                }
+            });
+
+        assertSame(NS_PROPS, FutureUtils.result(clientWithRetries.createNamespace(NS_NAME, NS_CONF)));
+    }
+
+    @Test
+    public void testDeleteNamespace() throws Exception {
+        when(client.deleteNamespace(anyString()))
+            .thenAnswer(invocationOnMock -> {
+                if (callCounter.decrementAndGet() > 0) {
+                    return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+                } else {
+                    return FutureUtils.value(true);
+                }
+            });
+
+        assertTrue(FutureUtils.result(clientWithRetries.deleteNamespace(NS_NAME)));
+    }
+
+    @Test
+    public void testGetNamespace() throws Exception {
+        when(client.getNamespace(anyString()))
+            .thenAnswer(invocationOnMock -> {
+                if (callCounter.decrementAndGet() > 0) {
+                    return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+                } else {
+                    return FutureUtils.value(NS_PROPS);
+                }
+            });
+
+        assertSame(NS_PROPS, FutureUtils.result(clientWithRetries.getNamespace(NS_NAME)));
+    }
+
+    @Test
+    public void testCreateStream() throws Exception {
+        when(client.createStream(anyString(), anyString(), any(StreamConfiguration.class)))
+            .thenAnswer(invocationOnMock -> {
+                if (callCounter.decrementAndGet() > 0) {
+                    return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+                } else {
+                    return FutureUtils.value(STREAM_PROPS);
+                }
+            });
+
+        assertSame(STREAM_PROPS, FutureUtils.result(clientWithRetries.createStream(NS_NAME, STREAM_NAME, STREAM_CONF)));
+    }
+
+    @Test
+    public void testDeleteStream() throws Exception {
+        when(client.deleteStream(anyString(), anyString()))
+            .thenAnswer(invocationOnMock -> {
+                if (callCounter.decrementAndGet() > 0) {
+                    return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+                } else {
+                    return FutureUtils.value(true);
+                }
+            });
+
+        assertTrue(FutureUtils.result(clientWithRetries.deleteStream(NS_NAME, STREAM_NAME)));
+    }
+
+    @Test
+    public void testGetStream() throws Exception {
+        when(client.getStream(anyString(), anyString()))
+            .thenAnswer(invocationOnMock -> {
+                if (callCounter.decrementAndGet() > 0) {
+                    return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+                } else {
+                    return FutureUtils.value(STREAM_PROPS);
+                }
+            });
+
+        assertSame(STREAM_PROPS, FutureUtils.result(clientWithRetries.getStream(NS_NAME, STREAM_NAME)));
+    }
+
+    @Test
+    public void testGetStreamById() throws Exception {
+        when(client.getStream(anyLong()))
+            .thenAnswer(invocationOnMock -> {
+                if (callCounter.decrementAndGet() > 0) {
+                    return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+                } else {
+                    return FutureUtils.value(STREAM_PROPS);
+                }
+            });
+
+        assertSame(STREAM_PROPS, FutureUtils.result(clientWithRetries.getStream(1234L)));
+    }
+}
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessorTest.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessorTest.java
new file mode 100644
index 0000000..2c229cb
--- /dev/null
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessorTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.clients.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ListenableFutureRpcProcessor}.
+ */
+public class ListenableFutureRpcProcessorTest {
+
+    private ListenableFutureRpcProcessor<String, String, String> processor;
+    private StorageContainerChannel scChannel;
+    private ScheduledExecutorService executor;
+
+    @Before
+    public void setup() {
+        executor = Executors.newSingleThreadScheduledExecutor();
+        scChannel = mock(StorageContainerChannel.class);
+        processor = spy(new ListenableFutureRpcProcessor<String, String, String>(
+            scChannel, executor, ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY) {
+            @Override
+            protected String createRequest() {
+                return null;
+            }
+
+            @Override
+            protected ListenableFuture<String> sendRPC(StorageServerChannel rsChannel, String s) {
+                return null;
+            }
+
+            @Override
+            protected String processResponse(String response) throws Exception {
+                return null;
+            }
+        });
+    }
+
+    @Test
+    public void testFailToConnect() {
+        CompletableFuture<StorageServerChannel> serverFuture = new CompletableFuture<>();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
+
+        CompletableFuture<String> resultFuture = processor.process();
+        verify(scChannel, times(1)).getStorageContainerChannelFuture();
+
+        // inject channel failure
+        Exception testExc = new Exception("test-exception");
+        serverFuture.completeExceptionally(testExc);
+
+        try {
+            FutureUtils.result(resultFuture);
+            fail("Should fail the process if failed to connect to storage server");
+        } catch (Exception e) {
+            assertSame(testExc, e);
+        }
+    }
+
+    @Test
+    public void testProcessSuccessfully() throws Exception {
+        String request = "request";
+        String response = "response";
+        String result = "result";
+
+        StorageServerChannel serverChannel = mock(StorageServerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverFuture = new CompletableFuture<>();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
+
+        SettableFuture<String> rpcFuture = SettableFuture.create();
+
+        // mock the process method
+        when(processor.createRequest()).thenReturn(request);
+        when(processor.sendRPC(same(serverChannel), eq(request))).thenReturn(rpcFuture);
+        when(processor.processResponse(eq(response))).thenReturn(result);
+
+        CompletableFuture<String> resultFuture = processor.process();
+        verify(scChannel, times(1)).getStorageContainerChannelFuture();
+
+        // complete the server future to return a mock server channel
+        FutureUtils.complete(serverFuture, serverChannel);
+
+        // complete the rpc future to return the response
+        rpcFuture.set(response);
+
+        assertEquals(result, resultFuture.get());
+    }
+
+    @Test
+    public void testProcessResponseException() throws Exception {
+        String request = "request";
+        String response = "response";
+
+        StorageServerChannel serverChannel = mock(StorageServerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverFuture = new CompletableFuture<>();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
+
+        SettableFuture<String> rpcFuture = SettableFuture.create();
+
+        Exception testException = new Exception("test-exception");
+
+        // mock the process method
+        when(processor.createRequest()).thenReturn(request);
+        when(processor.sendRPC(same(serverChannel), eq(request))).thenReturn(rpcFuture);
+        when(processor.processResponse(eq(response))).thenThrow(testException);
+
+        CompletableFuture<String> resultFuture = processor.process();
+        verify(scChannel, times(1)).getStorageContainerChannelFuture();
+
+        // complete the server future to return a mock server channel
+        FutureUtils.complete(serverFuture, serverChannel);
+
+        // complete the rpc future to return the response
+        rpcFuture.set(response);
+
+        try {
+            FutureUtils.result(resultFuture);
+            fail("Should throw exception on processing result");
+        } catch (Exception e) {
+            assertSame(testException, e);
+        }
+    }
+
+    @Test
+    public void testProcessRpcException() throws Exception {
+        String request = "request";
+        String response = "response";
+        String result = "result";
+
+        StorageServerChannel serverChannel = mock(StorageServerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverFuture = new CompletableFuture<>();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
+
+        SettableFuture<String> rpcFuture = SettableFuture.create();
+
+        // mock the process method
+        when(processor.createRequest()).thenReturn(request);
+        when(processor.sendRPC(same(serverChannel), eq(request))).thenReturn(rpcFuture);
+        when(processor.processResponse(eq(response))).thenReturn(result);
+
+        CompletableFuture<String> resultFuture = processor.process();
+        verify(scChannel, times(1)).getStorageContainerChannelFuture();
+
+        // complete the server future to return a mock server channel
+        FutureUtils.complete(serverFuture, serverChannel);
+
+        // complete the rpc future with `Status.INTERNAL`
+        rpcFuture.setException(new StatusRuntimeException(Status.INTERNAL));
+
+        try {
+            FutureUtils.result(resultFuture);
+            fail("Should throw fail immediately if rpc request failed");
+        } catch (Exception e) {
+            assertTrue(e instanceof StatusRuntimeException);
+            StatusRuntimeException sre = (StatusRuntimeException) e;
+            assertEquals(Status.INTERNAL, sre.getStatus());
+        }
+    }
+
+    @Test
+    public void testProcessRetryNotFoundRpcException() throws Exception {
+        String request = "request";
+        String response = "response";
+        String result = "result";
+
+        StorageServerChannel serverChannel = mock(StorageServerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverFuture = new CompletableFuture<>();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
+
+        AtomicInteger numRpcs = new AtomicInteger(0);
+
+        // mock the process method
+        when(processor.createRequest()).thenReturn(request);
+        when(processor.processResponse(eq(response))).thenReturn(result);
+        when(processor.sendRPC(same(serverChannel), eq(request))).thenAnswer(invocationOnMock -> {
+            SettableFuture<String> rpcFuture = SettableFuture.create();
+            if (numRpcs.getAndIncrement() > 2) {
+                rpcFuture.set(response);
+            } else {
+                rpcFuture.setException(new StatusRuntimeException(Status.NOT_FOUND));
+            }
+            return rpcFuture;
+        });
+
+        CompletableFuture<String> resultFuture = processor.process();
+
+        // complete the server future to return a mock server channel
+        FutureUtils.complete(serverFuture, serverChannel);
+
+        assertEquals(result, FutureUtils.result(resultFuture));
+        verify(scChannel, times(4)).getStorageContainerChannelFuture();
+    }
+}
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/RpcUtilsTest.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/RpcUtilsTest.java
new file mode 100644
index 0000000..d97d4e4
--- /dev/null
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/RpcUtilsTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.clients.utils;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import org.junit.Test;
+
+/**
+ * Unit test {@link RpcUtils}.
+ */
+public class RpcUtilsTest {
+
+    @Test
+    public void testIsContainerNotFound() {
+        StatusRuntimeException trueSRE = new StatusRuntimeException(Status.NOT_FOUND);
+        assertTrue(RpcUtils.isContainerNotFound(trueSRE));
+        StatusRuntimeException falseSRE = new StatusRuntimeException(Status.INTERNAL);
+        assertFalse(RpcUtils.isContainerNotFound(falseSRE));
+
+        StatusException trueSE = new StatusException(Status.NOT_FOUND);
+        assertTrue(RpcUtils.isContainerNotFound(trueSE));
+        StatusException falseSE = new StatusException(Status.INTERNAL);
+        assertFalse(RpcUtils.isContainerNotFound(falseSE));
+
+        Exception unknownException = new Exception("unknown");
+        assertFalse(RpcUtils.isContainerNotFound(unknownException));
+    }
+
+}
diff --git a/stream/clients/java/base/src/test/resources/log4j.properties b/stream/clients/java/base/src/test/resources/log4j.properties
new file mode 100644
index 0000000..9405038
--- /dev/null
+++ b/stream/clients/java/base/src/test/resources/log4j.properties
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# DisributedLog Logging Configuration
+#
+
+# Example with rolling log file
+log4j.rootLogger=INFO, CONSOLE
+
+#disable zookeeper logging
+log4j.logger.org.apache.zookeeper=OFF
+#Set the bookkeeper level to warning
+log4j.logger.org.apache.bookkeeper=INFO
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.Threshold=INFO
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+# Add ROLLINGFILE to rootLogger to get log file output
+#    Log DEBUG level and above messages to a log file
+#log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
+#log4j.appender.ROLLINGFILE.Threshold=INFO
+#log4j.appender.ROLLINGFILE.File=stream.log
+#log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+#log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd-HH-mm
+#log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.Threshold=TRACE
+log4j.appender.R.File=target/error.log
+log4j.appender.R.MaxFileSize=200MB
+log4j.appender.R.MaxBackupIndex=7
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableImpl.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableImpl.java
index 4cb7dc1..1bda971 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableImpl.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableImpl.java
@@ -49,6 +49,7 @@ import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManage
 import org.apache.bookkeeper.clients.impl.routing.RangeRouter;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.router.ByteBufHashRouter;
+import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
 
 /**
@@ -157,7 +158,8 @@ public class PByteBufTableImpl implements PTable<ByteBuf, ByteBuf> {
     public PByteBufTableImpl(String streamName,
                              StreamProperties props,
                              StorageServerClientManager clientManager,
-                             ScheduledExecutorService executor) {
+                             ScheduledExecutorService executor,
+                             Backoff.Policy backoffPolicy) {
         this(
             streamName,
             props,
@@ -171,7 +173,8 @@ public class PByteBufTableImpl implements PTable<ByteBuf, ByteBuf> {
                     executorService,
                     opFactory,
                     resultFactory,
-                    kvFactory),
+                    kvFactory,
+                    backoffPolicy),
             Optional.empty());
     }
 
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
index 1eb6ec8..03c64f7 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.api.kv.result.PutResult;
 import org.apache.bookkeeper.api.kv.result.RangeResult;
 import org.apache.bookkeeper.api.kv.result.TxnResult;
 import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.stream.proto.RangeProperties;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
@@ -58,6 +59,7 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
     private final OpFactory<ByteBuf, ByteBuf> opFactory;
     private final ResultFactory<ByteBuf, ByteBuf> resultFactory;
     private final KeyValueFactory<ByteBuf, ByteBuf> kvFactory;
+    private final Backoff.Policy backoffPolicy;
 
     PByteBufTableRangeImpl(long streamId,
                            RangeProperties rangeProps,
@@ -65,7 +67,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
                            ScheduledExecutorService executor,
                            OpFactory<ByteBuf, ByteBuf> opFactory,
                            ResultFactory<ByteBuf, ByteBuf> resultFactory,
-                           KeyValueFactory<ByteBuf, ByteBuf> kvFactory) {
+                           KeyValueFactory<ByteBuf, ByteBuf> kvFactory,
+                           Backoff.Policy backoffPolicy) {
         this.streamId = streamId;
         this.rangeProps = rangeProps;
         this.scChannel = scChannel;
@@ -73,6 +76,7 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
         this.opFactory = opFactory;
         this.resultFactory = resultFactory;
         this.kvFactory = kvFactory;
+        this.backoffPolicy = backoffPolicy;
     }
 
     private RoutingHeader.Builder newRoutingHeader(ByteBuf pKey) {
@@ -97,7 +101,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
                     .setHeader(newRoutingHeader(pKey))),
             response -> KvUtils.newRangeResult(response.getKvRangeResp(), resultFactory, kvFactory),
             scChannel,
-            executor
+            executor,
+            backoffPolicy
         ).process().whenComplete((value, cause) -> {
             pKey.release();
             lKey.release();
@@ -122,7 +127,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
                     .setHeader(newRoutingHeader(pKey))),
             response -> KvUtils.newPutResult(response.getKvPutResp(), resultFactory, kvFactory),
             scChannel,
-            executor
+            executor,
+            backoffPolicy
         ).process().whenComplete((ignored, cause) -> {
             pKey.release();
             lKey.release();
@@ -146,7 +152,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
                     .setHeader(newRoutingHeader(pKey))),
             response -> KvUtils.newDeleteResult(response.getKvDeleteResp(), resultFactory, kvFactory),
             scChannel,
-            executor
+            executor,
+            backoffPolicy
         ).process().whenComplete((ignored, cause) -> {
             pKey.release();
             lKey.release();
@@ -170,7 +177,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
                     .setHeader(newRoutingHeader(pKey))),
             response -> KvUtils.newIncrementResult(response.getKvIncrResp(), resultFactory, kvFactory),
             scChannel,
-            executor
+            executor,
+            backoffPolicy
         ).process().whenComplete((ignored, cause) -> {
             pKey.release();
             lKey.release();
@@ -246,7 +254,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
                     txnBuilder.setHeader(newRoutingHeader(pKey))),
                 response -> KvUtils.newKvTxnResult(response.getKvTxnResp(), resultFactory, kvFactory),
                 scChannel,
-                executor
+                executor,
+                backoffPolicy
             ).process().whenComplete((ignored, cause) -> {
                 pKey.release();
                 for (AutoCloseable resource : resourcesToRelease) {
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java
index 1b9d4cb..f5e0bc5 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java
@@ -18,10 +18,12 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Function;
+import java.util.stream.Stream;
 import org.apache.bookkeeper.clients.exceptions.InternalServerException;
 import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
 import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
 import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
@@ -36,8 +38,9 @@ public class TableRequestProcessor<RespT>
         StorageContainerRequest request,
         Function<StorageContainerResponse, T> responseFunc,
         StorageContainerChannel channel,
-        ScheduledExecutorService executor) {
-        return new TableRequestProcessor<>(request, responseFunc, channel, executor);
+        ScheduledExecutorService executor,
+        Policy backoffPolicy) {
+        return new TableRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
     }
 
     private final StorageContainerRequest request;
@@ -46,8 +49,9 @@ public class TableRequestProcessor<RespT>
     private TableRequestProcessor(StorageContainerRequest request,
                                   Function<StorageContainerResponse, RespT> respFunc,
                                   StorageContainerChannel channel,
-                                  ScheduledExecutorService executor) {
-        super(channel, executor);
+                                  ScheduledExecutorService executor,
+                                  Policy backoffPolicy) {
+        super(channel, executor, backoffPolicy);
         this.request = request;
         this.responseFunc = respFunc;
     }
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java
index fc93384..bec76e5 100644
--- a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java
@@ -38,6 +38,7 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
 import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
 import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
@@ -206,7 +207,8 @@ public class TableRequestProcessorTest extends GrpcClientTestBase {
             request,
             resp -> "test",
             scChannel,
-            scheduler);
+            scheduler,
+            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
         assertEquals("test", FutureUtils.result(processor.process()));
         assertSame(request, receivedRequest.get());
         assertEquals(type, receivedRequestType.get());
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestPByteBufTableImpl.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestPByteBufTableImpl.java
index c2adc66..d25c3c9 100644
--- a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestPByteBufTableImpl.java
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestPByteBufTableImpl.java
@@ -49,6 +49,7 @@ import org.apache.bookkeeper.clients.impl.internal.api.HashStreamRanges;
 import org.apache.bookkeeper.clients.impl.internal.api.MetaRangeClient;
 import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.clients.impl.routing.RangeRouter;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.router.HashRouter;
 import org.apache.bookkeeper.common.util.Bytes;
@@ -135,7 +136,8 @@ public class TestPByteBufTableImpl {
             runtime.getMethodName(),
             streamProps,
             mockClientManager,
-            scheduler.chooseThread(1));
+            scheduler.chooseThread(1),
+            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
         try {
             FutureUtils.result(table.initialize());
             fail("Should fail initializing the table with exception " + cause);

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.

[bookkeeper] 03/04: ISSUE #1384: Bump checkstyle version

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit c495cf0acc3e9ba818162a298802366472c3cab9
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu May 3 12:30:54 2018 -0700

    ISSUE #1384: Bump checkstyle version
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    mvn build fails on a centos/7 box. Exceptions are thrown as below:
    
    ```
    [ERROR] Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:2.17:check (checkstyle) on project circe-checksum: Execution checkstyle of goal org.apache.maven.plugins:maven-checkstyle-plugin:2.17:check failed: An API incompatibility was encountered while executing org.apache.maven.plugins:maven-checkstyle-plugin:2.17:check: java.lang.NoSuchMethodError: org.slf4j.spi.LocationAwareLogger.log(Lorg/slf4j/Marker;Ljava/lang/String;ILjava/lang/String;Ljava/lang/Throwable;)V
    ```
    
    *Solution*
    
    The problem was addressed in https://issues.apache.org/jira/browse/MCHECKSTYLE-335. We need to bump the checkstyle plugin version to `3.0.0`
    
    Master Issue: #1384
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>
    
    This closes #1385 from sijie/vagrant_files, closes #1384
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index cb91944..dd35a99 100644
--- a/pom.xml
+++ b/pom.xml
@@ -161,7 +161,7 @@
     <jacoco-maven-plugin.version>0.8.0</jacoco-maven-plugin.version>
     <maven-assembly-plugin.version>3.1.0</maven-assembly-plugin.version>
     <maven-bundle-plugin.version>3.2.0</maven-bundle-plugin.version>
-    <maven-checkstyle-plugin.version>2.17</maven-checkstyle-plugin.version>
+    <maven-checkstyle-plugin.version>3.0.0</maven-checkstyle-plugin.version>
     <maven-clean-plugin.version>2.5</maven-clean-plugin.version>
     <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
     <maven-dependency-plugin.version>3.0.2</maven-dependency-plugin.version>

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.

[bookkeeper] 01/04: [TABLE SERVICE] Dlog based checkpoint store

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 7f0d8adcdd1b567dae722d14427e28231ed9c2e5
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed Apr 25 02:13:28 2018 -0700

    [TABLE SERVICE] Dlog based checkpoint store
    
    *Motivation*
    
    Currently the table range stores are using local filesystem as a checkpoint store. It is okay for running as standalone mode.
    But it doesn't work to run in a distributed mode. This change is introducing a dlog based checkpoint store to make sure all
    the sst files of table ranges are durably checkpointed into bookkeeper itself.
    
    *Solution*
    
    Introduced a dlog based checkpoint store.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1366 from sijie/dlog_checkpoint_manager
---
 .../impl/metadata/ZKLogStreamMetadataStore.java    |   2 +
 .../org/apache/distributedlog/fs/DLFileSystem.java |  15 +-
 .../bookkeeper/stream/server/StorageServer.java    |   2 +
 .../rocksdb/checkpoint/dlog/DLCheckpointStore.java | 154 +++++++++++++
 .../rocksdb/checkpoint/dlog/DLInputStream.java     | 232 +++++++++++++++++++
 .../rocksdb/checkpoint/dlog/DLOutputStream.java    | 142 ++++++++++++
 .../impl/rocksdb/checkpoint/dlog/package-info.java |  23 ++
 .../checkpoint/dlog/DLCheckpointStoreTest.java     | 256 +++++++++++++++++++++
 .../storage/impl/store/MVCCStoreFactoryImpl.java   |  17 +-
 .../impl/store/MVCCStoreFactoryImplTest.java       |   1 +
 10 files changed, 838 insertions(+), 6 deletions(-)

diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index 1e36356..0e923c6 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -873,6 +873,8 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
                 } else if (Code.NOTEMPTY.intValue() == rc) {
                     future.completeExceptionally(new LockingException(oldLogPath + LOCK_PATH,
                         "Someone is holding a lock on log " + oldLogPath));
+                } else if (Code.NONODE.intValue() == rc) {
+                    future.completeExceptionally(new LogNotFoundException("Log " + newLogPath + " is not found"));
                 } else {
                     future.completeExceptionally(new ZKException("Failed to rename log "
                         + oldLogPath + " to " + newLogPath + " at path " + path, Code.get(rc)));
diff --git a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
index 1a056c3..7872052 100644
--- a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
+++ b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
@@ -29,6 +29,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.distributedlog.DLSN;
@@ -39,6 +40,7 @@ import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
 import org.apache.distributedlog.exceptions.LogEmptyException;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.util.Utils;
@@ -315,7 +317,18 @@ public class DLFileSystem extends FileSystem {
     public boolean rename(Path src, Path dst) throws IOException {
         String srcLog = getStreamName(src);
         String dstLog = getStreamName(dst);
-        namespace.renameLog(srcLog, dstLog);
+        try {
+            namespace.renameLog(srcLog, dstLog).get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new DLInterruptedException("Interrupted at renaming " + srcLog + " to " + dstLog, e);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IOException) {
+                throw (IOException) e.getCause();
+            } else {
+                throw new IOException("Failed to rename " + srcLog + " to " + dstLog, e.getCause());
+            }
+        }
         return true;
     }
 
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index 69a8e84..b48ea0d 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -27,6 +27,7 @@ import org.apache.bookkeeper.common.component.ComponentStarter;
 import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponentStack;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog.DLCheckpointStore;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
@@ -213,6 +214,7 @@ public class StorageServer {
             .withRangeStoreFactory(
                 new MVCCStoreFactoryImpl(
                     dlNamespaceProvider,
+                    () -> new DLCheckpointStore(dlNamespaceProvider.get()),
                     storageConf.getRangeStoreDirs(),
                     storageResources,
                     storageConf.getServeReadOnlyTables()));
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
new file mode 100644
index 0000000..8dc0447
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import com.google.common.collect.Lists;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.exceptions.LogEmptyException;
+import org.apache.distributedlog.exceptions.LogExistsException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.util.Utils;
+
+/**
+ * Dlog based checkpoint store.
+ */
+@Slf4j
+public class DLCheckpointStore implements CheckpointStore {
+
+    private final Namespace namespace;
+
+    public DLCheckpointStore(Namespace namespace) {
+        this.namespace = namespace;
+    }
+
+    @Override
+    public List<String> listFiles(String filePath) throws IOException {
+        return Lists.newArrayList(namespace.getLogs(filePath));
+    }
+
+    @Override
+    public boolean fileExists(String filePath) throws IOException {
+        return namespace.logExists(filePath);
+    }
+
+    @Override
+    public long getFileLength(String filePath) throws IOException {
+        try (DistributedLogManager dlm = namespace.openLog(filePath)) {
+            return dlm.getLastTxId();
+        } catch (LogNotFoundException e) {
+            throw new FileNotFoundException(filePath);
+        } catch (LogEmptyException e) {
+            return 0;
+        }
+    }
+
+    @Override
+    public InputStream openInputStream(String filePath) throws IOException {
+        try {
+            DistributedLogManager dlm = namespace.openLog(filePath);
+            LogReader reader;
+            try {
+                reader = dlm.openLogReader(DLSN.InitialDLSN);
+            } catch (LogNotFoundException | LogEmptyException e) {
+                throw new FileNotFoundException(filePath);
+            }
+            return new BufferedInputStream(
+                new DLInputStream(dlm, reader, 0L), 128 * 1024);
+        } catch (LogNotFoundException e) {
+            throw new FileNotFoundException(filePath);
+        }
+    }
+
+    @Override
+    public OutputStream openOutputStream(String filePath) throws IOException {
+        try {
+            DistributedLogManager dlm = namespace.openLog(
+                filePath);
+            AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
+            return new BufferedOutputStream(
+                new DLOutputStream(dlm, writer), 128 * 1024);
+        } catch (LogNotFoundException le) {
+            throw new FileNotFoundException(filePath);
+        }
+    }
+
+    @Override
+    public void rename(String srcLog, String dstLog) throws IOException {
+        log.info("Renaming {} to {}", srcLog, dstLog);
+        try {
+            namespace.renameLog(srcLog, dstLog).get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new DLInterruptedException("Interrupted at renaming " + srcLog + " to " + dstLog, e);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof LogExistsException) {
+                throw new FileAlreadyExistsException("Dest file already exists : " + dstLog);
+            } else if (e.getCause() instanceof LogNotFoundException) {
+                throw new NoSuchFileException("Src file or dest directory is not found");
+            } else if (e.getCause() instanceof IOException) {
+                throw (IOException) e.getCause();
+            } else {
+                throw new IOException("Failed to rename " + srcLog + " to " + dstLog, e.getCause());
+            }
+        }
+    }
+
+    @Override
+    public void deleteRecursively(String srcPath) throws IOException {
+        Iterator<String> logs = namespace.getLogs(srcPath);
+        while (logs.hasNext()) {
+            String child = logs.next();
+            deleteRecursively(srcPath + "/" + child);
+        }
+        namespace.deleteLog(srcPath);
+    }
+
+    @Override
+    public void delete(String srcPath) throws IOException {
+        namespace.deleteLog(srcPath);
+    }
+
+    @Override
+    public void createDirectories(String srcPath) throws IOException {
+        namespace.createLog(srcPath);
+    }
+
+    @Override
+    public void close() {
+        namespace.close();
+    }
+}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLInputStream.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLInputStream.java
new file mode 100644
index 0000000..c9e5fa8
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLInputStream.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+
+/**
+ * The input stream for a distributedlog stream.
+ */
+@Slf4j
+class DLInputStream extends InputStream {
+
+    private static final long REOPEN_READER_SKIP_BYTES = 4 * 1024 * 1024; // 4MB
+
+    private static class RecordStream {
+
+        private final InputStream payloadStream;
+        private final LogRecordWithDLSN record;
+
+        RecordStream(LogRecordWithDLSN record) {
+            checkNotNull(record);
+
+            this.record = record;
+            this.payloadStream = record.getPayLoadInputStream();
+        }
+
+    }
+
+    private static RecordStream nextRecordStream(LogReader reader) throws IOException {
+        LogRecordWithDLSN record = reader.readNext(false);
+        if (null != record) {
+            return new RecordStream(record);
+        }
+        return null;
+    }
+
+    private final DistributedLogManager dlm;
+    private LogReader reader;
+    private long pos;
+    private long lastPos;
+    private RecordStream currentRecord = null;
+
+    DLInputStream(DistributedLogManager dlm,
+                  LogReader reader,
+                  long startPos)
+            throws IOException {
+        this.dlm = dlm;
+        this.reader = reader;
+        this.pos = startPos;
+        this.lastPos = readEndPos();
+        seek(startPos);
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+        dlm.close();
+    }
+
+    private long readEndPos() throws IOException {
+        return dlm.getLastTxId();
+    }
+
+    //
+    // FSInputStream
+    //
+
+    public void seek(long pos) throws IOException {
+        if (this.pos == pos) {
+            return;
+        }
+
+        if (this.pos > pos || (pos - this.pos) >= REOPEN_READER_SKIP_BYTES) {
+            // close the previous reader
+            this.reader.close();
+            this.reader = dlm.openLogReader(pos);
+            this.currentRecord = null;
+        }
+
+        skipTo(pos);
+    }
+
+    private boolean skipTo(final long position) throws IOException {
+        while (true) {
+            if (null == currentRecord) {
+                currentRecord = nextRecordStream(reader);
+            }
+
+            if (null == currentRecord) { // the stream is empty now
+                return false;
+            }
+
+            long endPos = currentRecord.record.getTransactionId();
+            if (endPos < position) {
+                currentRecord = nextRecordStream(reader);
+                this.pos = endPos;
+                continue;
+            } else if (endPos == position){
+                // find the record, but we defer read next record when actual read happens
+                this.pos = position;
+                this.currentRecord = null;
+                return true;
+            } else {
+                this.currentRecord.payloadStream.skip(
+                    this.currentRecord.payloadStream.available() - (endPos - position));
+                this.pos = position;
+                return true;
+            }
+        }
+    }
+
+    //
+    // Input Stream
+    //
+
+    @Override
+    public int read(byte[] b, final int off, final int len) throws IOException {
+        int remaining = len;
+        int numBytesRead = 0;
+        while (remaining > 0) {
+            if (null == currentRecord) {
+                currentRecord = nextRecordStream(reader);
+            }
+
+            if (null == currentRecord) {
+                if (numBytesRead == 0) {
+                    return -1;
+                }
+                break;
+            }
+
+            int bytesLeft = currentRecord.payloadStream.available();
+            if (bytesLeft <= 0) {
+                currentRecord.payloadStream.close();
+                currentRecord = null;
+                continue;
+            }
+
+            int numBytesToRead = Math.min(bytesLeft, remaining);
+            int numBytes = currentRecord.payloadStream.read(b, off + numBytesRead, numBytesToRead);
+            if (numBytes < 0) {
+                continue;
+            }
+            numBytesRead += numBytes;
+            remaining -= numBytes;
+        }
+        return numBytesRead;
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        if (n <= 0L) {
+            return 0L;
+        }
+
+        long remaining = n;
+        while (true) {
+            if (null == currentRecord) {
+                currentRecord = nextRecordStream(reader);
+            }
+
+            if (null == currentRecord) { // end of stream
+                return n - remaining;
+            }
+
+            int bytesLeft = currentRecord.payloadStream.available();
+            long endPos = currentRecord.record.getTransactionId();
+            if (remaining > bytesLeft) {
+                // skip the whole record
+                remaining -= bytesLeft;
+                this.pos = endPos;
+                this.currentRecord = nextRecordStream(reader);
+                continue;
+            } else if (remaining == bytesLeft) {
+                this.pos = endPos;
+                this.currentRecord = null;
+                return n;
+            } else {
+                currentRecord.payloadStream.skip(remaining);
+                this.pos = endPos - currentRecord.payloadStream.available();
+                return n;
+            }
+        }
+    }
+
+    @Override
+    public int available() throws IOException {
+        if (lastPos - pos == 0L) {
+            lastPos = readEndPos();
+        }
+        return (int) (lastPos - pos);
+    }
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+    @Override
+    public int read() throws IOException {
+        byte[] data = new byte[1];
+        int numBytes = read(data);
+        if (numBytes <= 0) {
+            return -1;
+        }
+        return data[0];
+    }
+
+}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLOutputStream.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLOutputStream.java
new file mode 100644
index 0000000..8fe6200
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLOutputStream.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.concurrent.FutureEventListener;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.util.Utils;
+
+/**
+ * DistributedLog Output Stream.
+ */
+@Slf4j
+class DLOutputStream extends OutputStream {
+
+    private static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
+
+    private final DistributedLogManager dlm;
+    private final AsyncLogWriter writer;
+
+    // positions
+    private final long[] syncPos = new long[1];
+    private long writePos = 0L;
+
+    // state
+    private static final AtomicReferenceFieldUpdater<DLOutputStream, Throwable> exceptionUpdater =
+        AtomicReferenceFieldUpdater.newUpdater(DLOutputStream.class, Throwable.class, "exception");
+    private volatile Throwable exception = null;
+
+    DLOutputStream(DistributedLogManager dlm,
+                   AsyncLogWriter writer) {
+        this.dlm = dlm;
+        this.writer = writer;
+        this.writePos = writer.getLastTxId() < 0L ? 0L : writer.getLastTxId();
+        this.syncPos[0] = writePos;
+    }
+
+    public synchronized long position() {
+        return syncPos[0];
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        byte[] data = new byte[] { (byte) b };
+        write(data);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        write(Unpooled.wrappedBuffer(b));
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        write(Unpooled.wrappedBuffer(b, off, len));
+    }
+
+    private synchronized void write(ByteBuf buf) throws IOException {
+        Throwable cause = exceptionUpdater.get(this);
+        if (null != cause) {
+            if (cause instanceof IOException) {
+                throw (IOException) cause;
+            } else {
+                throw new UnexpectedException("Encountered unknown issue", cause);
+            }
+        }
+
+        writePos += buf.readableBytes();
+        LogRecord record = new LogRecord(writePos, buf);
+        writer.write(record).whenComplete(new FutureEventListener<DLSN>() {
+            @Override
+            public void onSuccess(DLSN value) {
+                synchronized (syncPos) {
+                    syncPos[0] = record.getTransactionId();
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                exceptionUpdater.compareAndSet(DLOutputStream.this, null, cause);
+            }
+        });
+    }
+
+    private CompletableFuture<DLSN> writeControlRecord() {
+        LogRecord record;
+        synchronized (this) {
+            record = new LogRecord(writePos, Unpooled.wrappedBuffer(CONTROL_RECORD_CONTENT));
+            record.setControl();
+        }
+        return writer.write(record);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        try {
+            FutureUtils.result(writeControlRecord());
+        } catch (IOException ioe) {
+            throw ioe;
+        } catch (Exception e) {
+            log.error("Unexpected exception in DLOutputStream", e);
+            throw new UnexpectedException("unexpected exception in DLOutputStream#flush()", e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        Utils.ioResult(
+            writeControlRecord()
+                .thenCompose(ignored -> writer.asyncClose())
+                .thenCompose(ignored -> dlm.asyncClose()));
+    }
+}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/package-info.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/package-info.java
new file mode 100644
index 0000000..697e13a
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Dlog based checkpoint manager.
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
\ No newline at end of file
diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStoreTest.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStoreTest.java
new file mode 100644
index 0000000..33f342d
--- /dev/null
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStoreTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.fs.FSCheckpointManager;
+import org.apache.distributedlog.DLMTestUtil;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.TestDistributedLogBase;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Unit test of {@link FSCheckpointManager}.
+ */
+public class DLCheckpointStoreTest extends TestDistributedLogBase {
+
+    private static final byte[] TEST_BYTES = "dlog-checkpoint-manager".getBytes(UTF_8);
+
+    @Rule
+    public final TestName runtime = new TestName();
+
+    private URI uri;
+    private Namespace namespace;
+    private DLCheckpointStore store;
+
+    @BeforeClass
+    public static void setupDL() throws Exception {
+        setupCluster();
+    }
+
+    @AfterClass
+    public static void teardownDL() throws Exception {
+        teardownCluster();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        this.uri = DLMTestUtil.createDLMURI(zkPort, "/" + runtime.getMethodName());
+        ensureURICreated(this.uri);
+        this.namespace = NamespaceBuilder.newBuilder()
+            .conf(new DistributedLogConfiguration())
+            .uri(uri)
+            .build();
+        this.store = new DLCheckpointStore(namespace);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (null != store) {
+            store.close();
+        }
+    }
+
+    @Test
+    public void testListFilesEmpty() throws Exception {
+        // create a dummy log stream to ensure "dir" exists
+        namespace.createLog(runtime.getMethodName());
+        assertTrue(store.listFiles(runtime.getMethodName()).isEmpty());
+    }
+
+    @Test
+    public void testListFilesNotFound() throws Exception {
+        assertTrue(store.listFiles(runtime.getMethodName()).isEmpty());
+    }
+
+    @Test
+    public void testListFiles() throws Exception {
+        int numFiles = 3;
+        List<String> expectedFiles = Lists.newArrayListWithExpectedSize(3);
+
+        namespace.createLog(runtime.getMethodName());
+        for (int i = 0; i < numFiles; ++i) {
+            String filename = runtime.getMethodName() + "-" + i;
+            expectedFiles.add(filename);
+            namespace.createLog(runtime.getMethodName() + "/" + filename);
+        }
+        List<String> files = store.listFiles(runtime.getMethodName());
+        Collections.sort(files);
+
+        assertEquals(expectedFiles, files);
+    }
+
+    @Test
+    public void testFileExists() throws Exception {
+        namespace.createLog(runtime.getMethodName() + "/test");
+        assertTrue(store.fileExists(runtime.getMethodName() + "/test"));
+        assertFalse(store.fileExists(runtime.getMethodName() + "/test2"));
+    }
+
+    @Test
+    public void testFileRename() throws Exception {
+        namespace.createLog("src");
+        namespace.createLog("dest");
+
+        String srcFilePath = "src/" + runtime.getMethodName();
+        String destFilePath = "dest/" + runtime.getMethodName();
+        OutputStream os = store.openOutputStream(srcFilePath);
+        os.write(TEST_BYTES);
+        os.flush();
+        os.close();
+
+        store.rename(srcFilePath, destFilePath);
+        assertTrue(store.fileExists(destFilePath));
+        assertFalse(store.fileExists(srcFilePath));
+
+        assertEquals(TEST_BYTES.length, store.getFileLength(destFilePath));
+
+        try (InputStream is = store.openInputStream(destFilePath)) {
+            byte[] readBytes = new byte[TEST_BYTES.length];
+            ByteStreams.readFully(is, readBytes);
+
+            assertArrayEquals(TEST_BYTES, readBytes);
+        }
+    }
+
+    @Test
+    public void testFileRenameDirNotExists() throws Exception {
+        namespace.createLog("src");
+        assertFalse(store.fileExists("dest"));
+
+        String srcFilePath = "src/" + runtime.getMethodName();
+        String destFilePath = "dest/" + runtime.getMethodName();
+
+        assertFalse(store.fileExists(srcFilePath));
+
+        OutputStream os = store.openOutputStream(srcFilePath);
+        os.write(TEST_BYTES);
+        os.flush();
+        os.close();
+
+        // rename will automatically create stream path in dlog
+        store.rename(srcFilePath, destFilePath);
+        assertTrue(store.fileExists(destFilePath));
+        assertFalse(store.fileExists(srcFilePath));
+
+        assertEquals(TEST_BYTES.length, store.getFileLength(destFilePath));
+
+        try (InputStream is = store.openInputStream(destFilePath)) {
+            byte[] readBytes = new byte[TEST_BYTES.length];
+            ByteStreams.readFully(is, readBytes);
+
+            assertArrayEquals(TEST_BYTES, readBytes);
+        }
+    }
+
+    @Test
+    public void testFileRenameFileExists() throws Exception {
+        namespace.createLog("src");
+        assertFalse(store.fileExists("dest"));
+
+        String srcFilePath = "src/" + runtime.getMethodName();
+        String destFilePath = "dest/" + runtime.getMethodName();
+        namespace.createLog(destFilePath);
+        assertTrue(store.fileExists(destFilePath));
+
+        assertFalse(store.fileExists(srcFilePath));
+
+        OutputStream os = store.openOutputStream(srcFilePath);
+        os.write(TEST_BYTES);
+        os.flush();
+        os.close();
+
+        assertTrue(store.fileExists(srcFilePath));
+
+        try {
+            store.rename(srcFilePath, destFilePath);
+            fail("Should fail to rename if the dest dir doesn't exist");
+        } catch (FileAlreadyExistsException e) {
+            // expected
+        }
+        assertTrue(store.fileExists(destFilePath));
+        assertTrue(store.fileExists(srcFilePath));
+        assertEquals(0, store.getFileLength(destFilePath));
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        int numFiles = 3;
+        List<String> expectedFiles = Lists.newArrayListWithExpectedSize(3);
+
+        namespace.createLog(runtime.getMethodName());
+        for (int i = 0; i < numFiles; ++i) {
+            String filename = runtime.getMethodName() + "-" + i;
+            expectedFiles.add(filename);
+            namespace.createLog(runtime.getMethodName() + "/" + filename);
+        }
+        List<String> files = store.listFiles(runtime.getMethodName());
+        Collections.sort(files);
+
+        assertEquals(expectedFiles, files);
+
+        store.delete(runtime.getMethodName());
+
+        assertFalse(store.fileExists(runtime.getMethodName()));
+    }
+
+    @Test
+    public void testDeleteRecursively() throws Exception {
+        int numFiles = 3;
+        List<String> expectedFiles = Lists.newArrayListWithExpectedSize(3);
+
+        namespace.createLog(runtime.getMethodName());
+        for (int i = 0; i < numFiles; ++i) {
+            String filename = runtime.getMethodName() + "-" + i;
+            expectedFiles.add(filename);
+            namespace.createLog(runtime.getMethodName() + "/" + filename);
+        }
+        List<String> files = store.listFiles(runtime.getMethodName());
+        Collections.sort(files);
+
+        assertEquals(expectedFiles, files);
+
+        store.delete(runtime.getMethodName());
+
+        assertFalse(store.fileExists(runtime.getMethodName()));
+    }
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
index 2c80a25..28f9955 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
@@ -41,7 +41,6 @@ import org.apache.bookkeeper.statelib.StateStores;
 import org.apache.bookkeeper.statelib.api.StateStoreSpec;
 import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
 import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
-import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.fs.FSCheckpointManager;
 import org.apache.bookkeeper.stream.protocol.RangeId;
 import org.apache.bookkeeper.stream.storage.StorageResources;
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -67,13 +66,15 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
     // dirs
     private final File[] localStateDirs;
     // checkpoint manager
-    private final CheckpointStore checkpointStore;
+    private final Supplier<CheckpointStore> checkpointStoreSupplier;
+    private CheckpointStore checkpointStore;
     // stores
     private final Map<Long, Map<RangeId, MVCCAsyncStore<byte[], byte[]>>> stores;
     private final boolean serveReadOnlyTable;
     private boolean closed = false;
 
     public MVCCStoreFactoryImpl(Supplier<Namespace> namespaceSupplier,
+                                Supplier<CheckpointStore> checkpointStoreSupplier,
                                 File[] localStoreDirs,
                                 StorageResources storageResources,
                                 boolean serveReadOnlyTable) {
@@ -86,8 +87,7 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
         this.checkpointScheduler =
             SharedResourceManager.shared().get(storageResources.checkpointScheduler());
         this.localStateDirs = localStoreDirs;
-        // TODO: change this cto dlog based checkpoint manager
-        this.checkpointStore = new FSCheckpointManager(new File(localStoreDirs[0], "checkpoints"));
+        this.checkpointStoreSupplier = checkpointStoreSupplier;
         this.stores = Maps.newHashMap();
         this.serveReadOnlyTable = serveReadOnlyTable;
     }
@@ -185,6 +185,10 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
             normalizedName(streamId),
             normalizedName(rangeId));
 
+        if (null == checkpointStore) {
+            checkpointStore = checkpointStoreSupplier.get();
+        }
+
         // build a spec
         StateStoreSpec spec = StateStoreSpec.builder()
             .name(storeName)
@@ -247,7 +251,10 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
         } catch (Exception e) {
             log.info("Encountered issue on closing all the range stores opened by this range factory");
         }
-        checkpointStore.close();
+        if (null != checkpointStore) {
+            checkpointStore.close();
+            checkpointStore = null;
+        }
 
         SharedResourceManager.shared().release(
             storageResources.ioWriteScheduler(), writeIOScheduler);
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java
index e5c36ee..9372adf 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java
@@ -99,6 +99,7 @@ public class MVCCStoreFactoryImplTest {
                 .build());
         this.factory = new MVCCStoreFactoryImpl(
             () -> namespace,
+            () -> new FSCheckpointManager(new File(storeDirs[0], "checkpoints")),
             storeDirs,
             resources,
             false);

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.