You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:39 UTC

[25/31] incubator-distributedlog git commit: DL-163: clean up direct zookeeper and bookkeeper usage and use metadata/data store abstraction

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
index 850f9c8..bcf8129 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
@@ -19,8 +19,14 @@ package com.twitter.distributedlog.logsegment;
 
 import com.google.common.annotations.Beta;
 import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+import com.twitter.distributedlog.metadata.LogMetadataForWriter;
+import com.twitter.distributedlog.util.Allocator;
+import com.twitter.distributedlog.util.Transaction;
 import com.twitter.util.Future;
 
+import java.io.IOException;
+
 /**
  * Log Segment Store to read log segments
  */
@@ -36,12 +42,14 @@ public interface LogSegmentEntryStore {
     Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment);
 
     /**
-     * Open the writer for writing data to the log <i>segment</i>.
+     * Create a new log segment allocator for allocating log segment entry writers.
      *
-     * @param segment the log <i>segment</i> to write data to
-     * @return future represent the opened writer
+     * @param metadata the metadata for the log stream
+     * @return future represent the log segment allocator
      */
-    Future<LogSegmentEntryWriter> openWriter(LogSegmentMetadata segment);
+    Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator(
+            LogMetadataForWriter metadata,
+            DynamicDistributedLogConfiguration dynConf) throws IOException;
 
     /**
      * Open the reader for reading data to the log <i>segment</i>.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
deleted file mode 100644
index ac36ef2..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.thrift.BKDLConfigFormat;
-import com.twitter.distributedlog.util.DLUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.apache.thrift.transport.TMemoryBuffer;
-import org.apache.thrift.transport.TMemoryInputTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * Configurations for BookKeeper based DL.
- */
-public class BKDLConfig implements DLConfig {
-
-    private static final Logger LOG = LoggerFactory.getLogger(BKDLConfig.class);
-
-    private static final int BUFFER_SIZE = 4096;
-    private static final ConcurrentMap<URI, DLConfig> cachedDLConfigs =
-            new ConcurrentHashMap<URI, DLConfig>();
-
-    public static void propagateConfiguration(BKDLConfig bkdlConfig, DistributedLogConfiguration dlConf) {
-        dlConf.setEncodeRegionIDInLogSegmentMetadata(bkdlConfig.getEncodeRegionID());
-        dlConf.setFirstLogSegmentSequenceNumber(bkdlConfig.getFirstLogSegmentSeqNo());
-        if (bkdlConfig.isFederatedNamespace()) {
-            dlConf.setCreateStreamIfNotExists(false);
-            LOG.info("Disabled createIfNotExists for federated namespace.");
-        }
-        LOG.info("Propagate BKDLConfig to DLConfig : encodeRegionID = {}," +
-                        " firstLogSegmentSequenceNumber = {}, createStreamIfNotExists = {}, isFederated = {}.",
-                new Object[] { dlConf.getEncodeRegionIDInLogSegmentMetadata(),
-                        dlConf.getFirstLogSegmentSequenceNumber(), dlConf.getCreateStreamIfNotExists(),
-                        bkdlConfig.isFederatedNamespace() });
-    }
-
-    public static BKDLConfig resolveDLConfig(ZooKeeperClient zkc, URI uri) throws IOException {
-        DLConfig dlConfig = cachedDLConfigs.get(uri);
-        if (dlConfig == null) {
-            dlConfig = (new ZkMetadataResolver(zkc).resolve(uri)).getDLConfig();
-            DLConfig oldDLConfig = cachedDLConfigs.putIfAbsent(uri, dlConfig);
-            if (null != oldDLConfig) {
-                dlConfig = oldDLConfig;
-            }
-        }
-        assert (dlConfig instanceof BKDLConfig);
-        return (BKDLConfig)dlConfig;
-    }
-
-    @VisibleForTesting
-    public static void clearCachedDLConfigs() {
-        cachedDLConfigs.clear();
-    }
-
-    private String bkZkServersForWriter;
-    private String bkZkServersForReader;
-    private String bkLedgersPath;
-    private boolean sanityCheckTxnID = true;
-    private boolean encodeRegionID = false;
-    private String dlZkServersForWriter;
-    private String dlZkServersForReader;
-    private String aclRootPath;
-    private Long firstLogSegmentSeqNo;
-    private boolean isFederatedNamespace = false;
-
-    /**
-     * Construct a empty config with given <i>uri</i>.
-     */
-    BKDLConfig(URI uri) {
-        this(DLUtils.getZKServersFromDLUri(uri),
-             DLUtils.getZKServersFromDLUri(uri),
-             null, null, null);
-    }
-
-    /**
-     * The caller should make sure both dl and bk use same zookeeper server.
-     *
-     * @param zkServers
-     *          zk servers used for both dl and bk.
-     * @param ledgersPath
-     *          ledgers path.
-     */
-    @VisibleForTesting
-    public BKDLConfig(String zkServers, String ledgersPath) {
-        this(zkServers, zkServers, zkServers, zkServers, ledgersPath);
-    }
-
-    public BKDLConfig(String dlZkServersForWriter,
-                      String dlZkServersForReader,
-                      String bkZkServersForWriter,
-                      String bkZkServersForReader,
-                      String bkLedgersPath) {
-        this.dlZkServersForWriter = dlZkServersForWriter;
-        this.dlZkServersForReader = dlZkServersForReader;
-        this.bkZkServersForWriter = bkZkServersForWriter;
-        this.bkZkServersForReader = bkZkServersForReader;
-        this.bkLedgersPath = bkLedgersPath;
-    }
-
-    /**
-     * @return zk servers used for bk for writers
-     */
-    public String getBkZkServersForWriter() {
-        return bkZkServersForWriter;
-    }
-
-    /**
-     * @return zk servers used for bk for readers
-     */
-    public String getBkZkServersForReader() {
-        return bkZkServersForReader;
-    }
-
-    /**
-     * @return zk servers used for dl for writers
-     */
-    public String getDlZkServersForWriter() {
-        return dlZkServersForWriter;
-    }
-
-    /**
-     * @return zk servers used for dl for readers
-     */
-    public String getDlZkServersForReader() {
-        return dlZkServersForReader;
-    }
-
-    /**
-     * @return ledgers path for bk
-     */
-    public String getBkLedgersPath() {
-        return bkLedgersPath;
-    }
-
-    /**
-     * Enable/Disable sanity check txn id.
-     *
-     * @param enabled
-     *          flag to enable/disable sanity check txn id.
-     * @return bk dl config.
-     */
-    public BKDLConfig setSanityCheckTxnID(boolean enabled) {
-        this.sanityCheckTxnID = enabled;
-        return this;
-    }
-
-    /**
-     * @return flag to sanity check highest txn id.
-     */
-    public boolean getSanityCheckTxnID() {
-        return sanityCheckTxnID;
-    }
-
-    /**
-     * Enable/Disable encode region id.
-     *
-     * @param enabled
-     *          flag to enable/disable encoding region id.
-     * @return bk dl config
-     */
-    public BKDLConfig setEncodeRegionID(boolean enabled) {
-        this.encodeRegionID = enabled;
-        return this;
-    }
-
-    /**
-     * @return flag to encode region id.
-     */
-    public boolean getEncodeRegionID() {
-        return encodeRegionID;
-    }
-
-    /**
-     * Set the root path of zk based ACL manager.
-     *
-     * @param aclRootPath
-     *          root path of zk based ACL manager.
-     * @return bk dl config
-     */
-    public BKDLConfig setACLRootPath(String aclRootPath) {
-        this.aclRootPath = aclRootPath;
-        return this;
-    }
-
-    /**
-     * Get the root path of zk based ACL manager.
-     *
-     * @return root path of zk based ACL manager.
-     */
-    public String getACLRootPath() {
-        return aclRootPath;
-    }
-
-    /**
-     * Set the value at which ledger sequence number should start for streams that are being
-     * upgraded and did not have ledger sequence number to start with or for newly created
-     * streams
-     *
-     * @param firstLogSegmentSeqNo first ledger sequence number
-     * @return bk dl config
-     */
-    public BKDLConfig setFirstLogSegmentSeqNo(long firstLogSegmentSeqNo) {
-        this.firstLogSegmentSeqNo = firstLogSegmentSeqNo;
-        return this;
-    }
-
-    /**
-     * Get the value at which ledger sequence number should start for streams that are being
-     * upgraded and did not have ledger sequence number to start with or for newly created
-     * streams
-     *
-     * @return first ledger sequence number
-     */
-    public Long getFirstLogSegmentSeqNo() {
-        if (null == firstLogSegmentSeqNo) {
-            return DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO;
-        }
-        return firstLogSegmentSeqNo;
-    }
-
-    /**
-     * Set the namespace to federated <i>isFederatedNamespace</i>.
-     *
-     * @param isFederatedNamespace
-     *          is the namespace federated?
-     * @return bk dl config
-     */
-    public BKDLConfig setFederatedNamespace(boolean isFederatedNamespace) {
-        this.isFederatedNamespace = isFederatedNamespace;
-        return this;
-    }
-
-    /**
-     * Whether the namespace is federated namespace
-     *
-     * @return true if the namespace is a federated namespace. otherwise false.
-     */
-    public boolean isFederatedNamespace() {
-        return this.isFederatedNamespace;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(bkZkServersForWriter, bkZkServersForReader,
-                                dlZkServersForWriter, dlZkServersForReader,
-                                bkLedgersPath);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof BKDLConfig)) {
-            return false;
-        }
-        BKDLConfig another = (BKDLConfig) o;
-        return Objects.equal(bkZkServersForWriter, another.bkZkServersForWriter) &&
-               Objects.equal(bkZkServersForReader, another.bkZkServersForReader) &&
-               Objects.equal(dlZkServersForWriter, another.dlZkServersForWriter) &&
-               Objects.equal(dlZkServersForReader, another.dlZkServersForReader) &&
-               Objects.equal(bkLedgersPath, another.bkLedgersPath) &&
-               sanityCheckTxnID == another.sanityCheckTxnID &&
-               encodeRegionID == another.encodeRegionID &&
-               Objects.equal(aclRootPath, another.aclRootPath) &&
-               Objects.equal(firstLogSegmentSeqNo, another.firstLogSegmentSeqNo) &&
-               Objects.equal(isFederatedNamespace, another.isFederatedNamespace);
-
-    }
-
-    @Override
-    public String toString() {
-        return serialize();
-    }
-
-    @Override
-    public String serialize() {
-        BKDLConfigFormat configFormat = new BKDLConfigFormat();
-        if (null != bkZkServersForWriter) {
-            configFormat.setBkZkServers(bkZkServersForWriter);
-        }
-        if (null != bkZkServersForReader) {
-            configFormat.setBkZkServersForReader(bkZkServersForReader);
-        }
-        if (null != dlZkServersForWriter) {
-            configFormat.setDlZkServersForWriter(dlZkServersForWriter);
-        }
-        if (null != dlZkServersForReader) {
-            configFormat.setDlZkServersForReader(dlZkServersForReader);
-        }
-        if (null != bkLedgersPath) {
-            configFormat.setBkLedgersPath(bkLedgersPath);
-        }
-        configFormat.setSanityCheckTxnID(sanityCheckTxnID);
-        configFormat.setEncodeRegionID(encodeRegionID);
-        if (null != aclRootPath) {
-            configFormat.setAclRootPath(aclRootPath);
-        }
-        if (null != firstLogSegmentSeqNo) {
-            configFormat.setFirstLogSegmentSeqNo(firstLogSegmentSeqNo);
-        }
-        if (isFederatedNamespace) {
-            configFormat.setFederatedNamespace(true);
-        }
-        return serialize(configFormat);
-    }
-
-    String serialize(BKDLConfigFormat configFormat) {
-        TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
-        TJSONProtocol protocol = new TJSONProtocol(transport);
-        try {
-            configFormat.write(protocol);
-            transport.flush();
-            return transport.toString("UTF-8");
-        } catch (TException e) {
-            throw new RuntimeException("Failed to serialize BKDLConfig : ", e);
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException("Failed to serialize BKDLConfig : ", e);
-        }
-    }
-
-    @Override
-    public void deserialize(byte[] data) throws IOException {
-        BKDLConfigFormat configFormat = new BKDLConfigFormat();
-        TMemoryInputTransport transport = new TMemoryInputTransport(data);
-        TJSONProtocol protocol = new TJSONProtocol(transport);
-        try {
-            configFormat.read(protocol);
-        } catch (TException e) {
-            throw new IOException("Failed to deserialize data '" +
-                    new String(data, UTF_8) + "' : ", e);
-        }
-        // bookkeeper cluster settings
-        if (configFormat.isSetBkZkServers()) {
-            bkZkServersForWriter = configFormat.getBkZkServers();
-        }
-        if (configFormat.isSetBkZkServersForReader()) {
-            bkZkServersForReader = configFormat.getBkZkServersForReader();
-        } else {
-            bkZkServersForReader = bkZkServersForWriter;
-        }
-        if (configFormat.isSetBkLedgersPath()) {
-            bkLedgersPath = configFormat.getBkLedgersPath();
-        }
-        // dl zookeeper cluster settings
-        if (configFormat.isSetDlZkServersForWriter()) {
-            dlZkServersForWriter = configFormat.getDlZkServersForWriter();
-        }
-        if (configFormat.isSetDlZkServersForReader()) {
-            dlZkServersForReader = configFormat.getDlZkServersForReader();
-        } else {
-            dlZkServersForReader = dlZkServersForWriter;
-        }
-        // dl settings
-        sanityCheckTxnID = !configFormat.isSetSanityCheckTxnID() || configFormat.isSanityCheckTxnID();
-        encodeRegionID = configFormat.isSetEncodeRegionID() && configFormat.isEncodeRegionID();
-        if (configFormat.isSetAclRootPath()) {
-            aclRootPath = configFormat.getAclRootPath();
-        }
-
-        if (configFormat.isSetFirstLogSegmentSeqNo()) {
-            firstLogSegmentSeqNo = configFormat.getFirstLogSegmentSeqNo();
-        }
-        isFederatedNamespace = configFormat.isSetFederatedNamespace() && configFormat.isFederatedNamespace();
-
-        // Validate the settings
-        if (null == bkZkServersForWriter || null == bkZkServersForReader || null == bkLedgersPath ||
-                null == dlZkServersForWriter || null == dlZkServersForReader) {
-            throw new IOException("Missing zk/bk settings in BKDL Config : " + new String(data, UTF_8));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java
index e0331c6..c0b5fb7 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java
@@ -18,6 +18,7 @@
 package com.twitter.distributedlog.metadata;
 
 import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
 import com.twitter.distributedlog.util.Utils;
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.ZooKeeperClientBuilder;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java
deleted file mode 100644
index 303fbe6..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import com.twitter.distributedlog.ZooKeeperClient;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.data.Stat;
-
-import java.io.IOException;
-import java.net.URI;
-
-public class ZkMetadataResolver implements MetadataResolver {
-
-    private final ZooKeeperClient zkc;
-
-    public ZkMetadataResolver(ZooKeeperClient zkc) {
-        this.zkc = zkc;
-    }
-
-    @Override
-    public DLMetadata resolve(URI uri) throws IOException {
-        String dlPath = uri.getPath();
-        PathUtils.validatePath(dlPath);
-        // Normal case the dl metadata is stored in the last segment
-        // so lookup last segment first.
-        String[] parts = StringUtils.split(dlPath, '/');
-        if (null == parts || 0 == parts.length) {
-            throw new IOException("Invalid dlPath to resolve dl metadata : " + dlPath);
-        }
-        for (int i = parts.length; i >= 0; i--) {
-            String pathToResolve = String.format("/%s", StringUtils.join(parts, '/', 0, i));
-            byte[] data;
-            try {
-                data = zkc.get().getData(pathToResolve, false, new Stat());
-            } catch (KeeperException.NoNodeException nne) {
-                continue;
-            } catch (KeeperException ke) {
-                throw new IOException("Fail to resolve dl path : " + pathToResolve);
-            } catch (InterruptedException ie) {
-                throw new IOException("Interrupted when resolving dl path : " + pathToResolve);
-            }
-            if (null == data || data.length == 0) {
-                continue;
-            }
-            try {
-                return DLMetadata.deserialize(uri, data);
-            } catch (IOException ie) {
-                throw new IOException("Failed to deserialize uri : " + uri);
-            }
-        }
-        throw new IOException("No bkdl config bound under dl path : " + dlPath);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
index b5abe9f..5d1d888 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
@@ -70,6 +70,13 @@ import org.apache.bookkeeper.stats.StatsLogger;
 @Beta
 public interface DistributedLogNamespace {
 
+    /**
+     * Get the namespace driver used by this namespace.
+     *
+     * @return namespace driver
+     */
+    NamespaceDriver getNamespaceDriver();
+
     //
     // Method to operate logs
     //

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java
index a01bb70..07b3848 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java
@@ -17,22 +17,29 @@
  */
 package com.twitter.distributedlog.namespace;
 
-import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.twitter.distributedlog.BKDistributedLogNamespace;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+import com.twitter.distributedlog.feature.CoreFeatureKeys;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
+import com.twitter.distributedlog.injector.AsyncRandomFailureInjector;
+import com.twitter.distributedlog.util.ConfUtils;
+import com.twitter.distributedlog.util.DLUtils;
+import com.twitter.distributedlog.util.OrderedScheduler;
+import com.twitter.distributedlog.util.PermitLimiter;
+import com.twitter.distributedlog.util.SimplePermitLimiter;
+import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 
 /**
  * Builder to construct a <code>DistributedLogNamespace</code>.
@@ -50,6 +57,7 @@ public class DistributedLogNamespaceBuilder {
     }
 
     private DistributedLogConfiguration _conf = null;
+    private DynamicDistributedLogConfiguration _dynConf = null;
     private URI _uri = null;
     private StatsLogger _statsLogger = NullStatsLogger.INSTANCE;
     private StatsLogger _perLogStatsLogger = NullStatsLogger.INSTANCE;
@@ -73,6 +81,17 @@ public class DistributedLogNamespaceBuilder {
     }
 
     /**
+     * Dynamic DistributedLog Configuration used for the namespace
+     *
+     * @param dynConf dynamic distributedlog configuration
+     * @return namespace builder
+     */
+    public DistributedLogNamespaceBuilder dynConf(DynamicDistributedLogConfiguration dynConf) {
+        this._dynConf = dynConf;
+        return this;
+    }
+
+    /**
      * Namespace Location.
      *
      * @param uri
@@ -146,6 +165,18 @@ public class DistributedLogNamespaceBuilder {
         return this;
     }
 
+    @SuppressWarnings("deprecation")
+    private static StatsLogger normalizePerLogStatsLogger(StatsLogger statsLogger,
+                                                          StatsLogger perLogStatsLogger,
+                                                          DistributedLogConfiguration conf) {
+        StatsLogger normalizedPerLogStatsLogger = perLogStatsLogger;
+        if (perLogStatsLogger == NullStatsLogger.INSTANCE &&
+                conf.getEnablePerStreamStat()) {
+            normalizedPerLogStatsLogger = statsLogger.scope("stream");
+        }
+        return normalizedPerLogStatsLogger;
+    }
+
     /**
      * Build the namespace.
      *
@@ -160,25 +191,17 @@ public class DistributedLogNamespaceBuilder {
         Preconditions.checkNotNull(_conf, "No DistributedLog Configuration.");
         Preconditions.checkNotNull(_uri, "No DistributedLog URI");
 
-        // Validate the uri and load the backend according to scheme
-        String scheme = _uri.getScheme();
-        Preconditions.checkNotNull(scheme, "Invalid DistributedLog URI : " + _uri);
-        String[] schemeParts = StringUtils.split(scheme, '-');
-        Preconditions.checkArgument(schemeParts.length > 0,
-                "Invalid distributedlog scheme found : " + _uri);
-        Preconditions.checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()),
-                "Unknown distributedlog scheme found : " + _uri);
-
-        // both distributedlog: & distributedlog-bk: use bookkeeper as the backend
-        // TODO: we could do reflection to load backend in future.
-        //       if we are going to support other backends : e.g. 'distributedlog-mem:'.
-        if (schemeParts.length > 1) {
-            String backendProvider = schemeParts[1];
-            Preconditions.checkArgument(Objects.equal(DistributedLogConstants.BACKEND_BK, backendProvider.toLowerCase()),
-                    "Backend '" + backendProvider + "' is not supported yet.");
+        // validate the configuration
+        _conf.validate();
+        if (null == _dynConf) {
+            _dynConf = ConfUtils.getConstDynConf(_conf);
         }
 
-        // Built the feature provider
+        // retrieve the namespace driver
+        NamespaceDriver driver = NamespaceDriverManager.getDriver(_uri);
+        URI normalizedUri = DLUtils.normalizeURI(_uri);
+
+        // build the feature provider
         FeatureProvider featureProvider;
         if (null == _featureProvider) {
             featureProvider = new SettableFeatureProvider("", 0);
@@ -187,25 +210,69 @@ public class DistributedLogNamespaceBuilder {
             featureProvider = _featureProvider;
         }
 
-        URI bkUri;
-        try {
-            bkUri = new URI(
-                    schemeParts[0],     // remove backend info from bookkeeper backend
-                    _uri.getAuthority(),
-                    _uri.getPath(),
-                    _uri.getQuery(),
-                    _uri.getFragment());
-        } catch (URISyntaxException e) {
-            throw new IllegalArgumentException("Invalid distributedlog uri found : " + _uri, e);
-        }
+        // build the failure injector
+        AsyncFailureInjector failureInjector = AsyncRandomFailureInjector.newBuilder()
+                .injectDelays(_conf.getEIInjectReadAheadDelay(),
+                              _conf.getEIInjectReadAheadDelayPercent(),
+                              _conf.getEIInjectMaxReadAheadDelayMs())
+                .injectErrors(false, 10)
+                .injectStops(_conf.getEIInjectReadAheadStall(), 10)
+                .injectCorruption(_conf.getEIInjectReadAheadBrokenEntries())
+                .build();
 
-        return BKDistributedLogNamespace.newBuilder()
-                .conf(_conf)
-                .uri(bkUri)
-                .statsLogger(_statsLogger)
-                .featureProvider(featureProvider)
-                .clientId(_clientId)
-                .regionId(_regionId)
+        // normalize the per log stats logger
+        StatsLogger perLogStatsLogger = normalizePerLogStatsLogger(_statsLogger, _perLogStatsLogger, _conf);
+
+        // build the scheduler
+        StatsLogger schedulerStatsLogger = _statsLogger.scope("factory").scope("thread_pool");
+        OrderedScheduler scheduler = OrderedScheduler.newBuilder()
+                .name("DLM-" + normalizedUri.getPath())
+                .corePoolSize(_conf.getNumWorkerThreads())
+                .statsLogger(schedulerStatsLogger)
+                .perExecutorStatsLogger(schedulerStatsLogger)
+                .traceTaskExecution(_conf.getEnableTaskExecutionStats())
+                .traceTaskExecutionWarnTimeUs(_conf.getTaskExecutionWarnTimeMicros())
                 .build();
+
+        // initialize the namespace driver
+        driver.initialize(
+                _conf,
+                _dynConf,
+                normalizedUri,
+                scheduler,
+                featureProvider,
+                failureInjector,
+                _statsLogger,
+                perLogStatsLogger,
+                DLUtils.normalizeClientId(_clientId),
+                _regionId);
+
+        // initialize the write limiter
+        PermitLimiter writeLimiter;
+        if (_conf.getGlobalOutstandingWriteLimit() < 0) {
+            writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
+        } else {
+            Feature disableWriteLimitFeature = featureProvider.getFeature(
+                CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase());
+            writeLimiter = new SimplePermitLimiter(
+                _conf.getOutstandingWriteLimitDarkmode(),
+                _conf.getGlobalOutstandingWriteLimit(),
+                _statsLogger.scope("writeLimiter"),
+                true /* singleton */,
+                disableWriteLimitFeature);
+        }
+
+        return new BKDistributedLogNamespace(
+                _conf,
+                normalizedUri,
+                driver,
+                scheduler,
+                featureProvider,
+                writeLimiter,
+                failureInjector,
+                _statsLogger,
+                perLogStatsLogger,
+                DLUtils.normalizeClientId(_clientId),
+                _regionId);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java
new file mode 100644
index 0000000..738f124
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.namespace;
+
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.MetadataAccessor;
+import com.twitter.distributedlog.acl.AccessControlManager;
+import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.metadata.LogMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
+import com.twitter.distributedlog.subscription.SubscriptionsStore;
+import com.twitter.distributedlog.util.OrderedScheduler;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Manager to manage all the stores required by a namespace.
+ */
+public interface NamespaceDriver extends Closeable {
+
+    enum Role {
+        WRITER,
+        READER
+    }
+
+    /**
+     * Initialize the namespace manager.
+     *
+     * @param conf distributedlog configuration
+     * @param dynConf dynamic distributedlog configuration
+     * @param namespace root uri of the namespace
+     * @param scheduler ordered scheduler
+     * @param featureProvider feature provider
+     * @param statsLogger stats logger
+     * @param perLogStatsLogger per log stream stats logger
+     * @param clientId client id
+     * @return namespace manager
+     * @throws IOException when failed to initialize the namespace manager
+     */
+    NamespaceDriver initialize(DistributedLogConfiguration conf,
+                               DynamicDistributedLogConfiguration dynConf,
+                               URI namespace,
+                               OrderedScheduler scheduler,
+                               FeatureProvider featureProvider,
+                               AsyncFailureInjector failureInjector,
+                               StatsLogger statsLogger,
+                               StatsLogger perLogStatsLogger,
+                               String clientId,
+                               int regionId) throws IOException;
+
+    /**
+     * Get the scheme of the namespace driver.
+     *
+     * @return the scheme of the namespace driver.
+     */
+    String getScheme();
+
+    /**
+     * Get the root uri of the namespace driver.
+     *
+     * @return the root uri of the namespace driver.
+     */
+    URI getUri();
+
+    /**
+     * Retrieve the log {@code metadata store} used by the namespace.
+     *
+     * @return the log metadata store
+     */
+    LogMetadataStore getLogMetadataStore();
+
+    /**
+     * Retrieve the log stream {@code metadata store} used by the namespace.
+     *
+     * @param role the role to retrieve the log stream metadata store.
+     * @return the log stream metadata store
+     */
+    LogStreamMetadataStore getLogStreamMetadataStore(Role role);
+
+    /**
+     * Retrieve the log segment {@code entry store} used by the namespace.
+     *
+     * @param role the role to retrieve the log segment entry store.
+     * @return the log segment entry store.
+     * @throws IOException when failed to open log segment entry store.
+     */
+    LogSegmentEntryStore getLogSegmentEntryStore(Role role);
+
+    /**
+     * Create an access control manager to manage/check acl for logs.
+     *
+     * @return access control manager for logs under the namespace.
+     * @throws IOException
+     */
+    AccessControlManager getAccessControlManager()
+            throws IOException;
+
+    /**
+     * Retrieve the metadata accessor for log stream {@code streamName}.
+     * (TODO: it is a legacy interface. should remove it if we have metadata of stream.)
+     *
+     * @param streamName name of log stream.
+     * @return metadata accessor for log stream {@code streamName}.
+     */
+    MetadataAccessor getMetadataAccessor(String streamName)
+            throws InvalidStreamNameException, IOException;
+
+    /**
+     * Retrieve the subscriptions store for log stream {@code streamName}.
+     *
+     * @return the subscriptions store for log stream {@code streamName}
+     */
+    SubscriptionsStore getSubscriptionsStore(String streamName);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java
new file mode 100644
index 0000000..79945ad
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.namespace;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static com.google.common.base.Preconditions.*;
+
+/**
+ * The basic service for managing a set of namespace drivers.
+ */
+public class NamespaceDriverManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(NamespaceDriverManager.class);
+
+    static class NamespaceDriverInfo {
+
+        final Class<? extends NamespaceDriver> driverClass;
+        final String driverClassName;
+
+        NamespaceDriverInfo(Class<? extends NamespaceDriver> driverClass) {
+            this.driverClass = driverClass;
+            this.driverClassName = this.driverClass.getName();
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("driver[")
+                    .append(driverClassName)
+                    .append("]");
+            return sb.toString();
+        }
+    }
+
+    private static final ConcurrentMap<String, NamespaceDriverInfo> drivers;
+    private static boolean initialized = false;
+
+    static {
+        drivers = new ConcurrentHashMap<String, NamespaceDriverInfo>();
+        initialize();
+    }
+
+    static void initialize() {
+        if (initialized) {
+            return;
+        }
+        loadInitialDrivers();
+        initialized = true;
+        logger.info("DistributedLog NamespaceDriverManager initialized");
+    }
+
+    private static void loadInitialDrivers() {
+        Set<String> driverList = Sets.newHashSet();
+        // add default bookkeeper based driver
+        driverList.add(BKNamespaceDriver.class.getName());
+        // load drivers from system property
+        String driversStr = System.getProperty("distributedlog.namespace.drivers");
+        if (null != driversStr) {
+            String[] driversArray = StringUtils.split(driversStr, ':');
+            for (String driver : driversArray) {
+                driverList.add(driver);
+            }
+        }
+        // initialize the drivers
+        for (String driverClsName : driverList) {
+            try {
+                NamespaceDriver driver =
+                        ReflectionUtils.newInstance(driverClsName, NamespaceDriver.class);
+                NamespaceDriverInfo driverInfo = new NamespaceDriverInfo(driver.getClass());
+                drivers.put(driver.getScheme().toLowerCase(), driverInfo);
+            } catch (Exception ex) {
+                logger.warn("Failed to load namespace driver {} : ", driverClsName, ex);
+            }
+        }
+    }
+
+    /**
+     * Prevent the NamespaceDriverManager class from being instantiated.
+     */
+    private NamespaceDriverManager() {}
+
+    /**
+     * Register the namespace {@code driver}.
+     *
+     * @param driver the namespace driver
+     * @return the namespace driver manager
+     */
+    public static void registerDriver(String backend, Class<? extends NamespaceDriver> driver) {
+        if (!initialized) {
+            initialize();
+        }
+
+        String scheme = backend.toLowerCase();
+        NamespaceDriverInfo oldDriverInfo = drivers.get(scheme);
+        if (null != oldDriverInfo) {
+            return;
+        }
+        NamespaceDriverInfo newDriverInfo = new NamespaceDriverInfo(driver);
+        oldDriverInfo = drivers.putIfAbsent(scheme, newDriverInfo);
+        if (null != oldDriverInfo) {
+            logger.debug("Driver for {} is already there.", scheme);
+        }
+    }
+
+    /**
+     * Retrieve the namespace driver for {@code scheme}.
+     *
+     * @param scheme the scheme for the namespace driver
+     * @return the namespace driver
+     * @throws NullPointerException when scheme is null
+     */
+    public static NamespaceDriver getDriver(String scheme) {
+        checkNotNull(scheme, "Driver Scheme is null");
+        if (!initialized) {
+            initialize();
+        }
+        NamespaceDriverInfo driverInfo = drivers.get(scheme.toLowerCase());
+        if (null == driverInfo) {
+            throw new IllegalArgumentException("Unknown backend " + scheme);
+        }
+        return ReflectionUtils.newInstance(driverInfo.driverClass);
+    }
+
+    /**
+     * Retrieve the namespace driver for {@code uri}.
+     *
+     * @param uri the distributedlog uri
+     * @return the namespace driver for {@code uri}
+     * @throws NullPointerException if the distributedlog {@code uri} is null or doesn't have scheme
+     *          or there is no namespace driver registered for the scheme
+     * @throws IllegalArgumentException if the distributedlog {@code uri} scheme is illegal
+     */
+    public static NamespaceDriver getDriver(URI uri) {
+        // Validate the uri and load the backend according to scheme
+        checkNotNull(uri, "DistributedLog uri is null");
+        String scheme = uri.getScheme();
+        checkNotNull(scheme, "Invalid distributedlog uri : " + uri);
+        scheme = scheme.toLowerCase();
+        String[] schemeParts = StringUtils.split(scheme, '-');
+        checkArgument(schemeParts.length > 0,
+                "Invalid distributedlog scheme found : " + uri);
+        checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()),
+                "Unknown distributedlog scheme found : " + uri);
+        // bookkeeper is the default backend
+        String backend = DistributedLogConstants.BACKEND_BK;
+        if (schemeParts.length > 1) {
+            backend = schemeParts[1];
+        }
+        return getDriver(backend);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionStateStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionStateStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionStateStore.java
deleted file mode 100644
index 9cd2da5..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionStateStore.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.subscription;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
-
-import com.google.common.base.Charsets;
-
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-
-public class ZKSubscriptionStateStore implements SubscriptionStateStore {
-
-    static final Logger logger = LoggerFactory.getLogger(ZKSubscriptionStateStore.class);
-
-    private final ZooKeeperClient zooKeeperClient;
-    private final String zkPath;
-    private AtomicReference<DLSN> lastCommittedPosition = new AtomicReference<DLSN>(null);
-
-    public ZKSubscriptionStateStore(ZooKeeperClient zooKeeperClient, String zkPath) {
-        this.zooKeeperClient = zooKeeperClient;
-        this.zkPath = zkPath;
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    /**
-     * Get the last committed position stored for this subscription
-     */
-    @Override
-    public Future<DLSN> getLastCommitPosition() {
-        if (null != lastCommittedPosition.get()) {
-            return Future.value(lastCommittedPosition.get());
-        } else {
-            return getLastCommitPositionFromZK();
-        }
-    }
-
-    Future<DLSN> getLastCommitPositionFromZK() {
-        final Promise<DLSN> result = new Promise<DLSN>();
-        try {
-            logger.debug("Reading last commit position from path {}", zkPath);
-            zooKeeperClient.get().getData(zkPath, false, new AsyncCallback.DataCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-                    logger.debug("Read last commit position from path {}: rc = {}", zkPath, rc);
-                    if (KeeperException.Code.NONODE.intValue() == rc) {
-                        result.setValue(DLSN.NonInclusiveLowerBound);
-                    } else if (KeeperException.Code.OK.intValue() != rc) {
-                        result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
-                    } else {
-                        try {
-                            DLSN dlsn = DLSN.deserialize(new String(data, Charsets.UTF_8));
-                            result.setValue(dlsn);
-                        } catch (Exception t) {
-                            logger.warn("Invalid last commit position found from path {}", zkPath, t);
-                            // invalid dlsn recorded in subscription state store
-                            result.setValue(DLSN.NonInclusiveLowerBound);
-                        }
-                    }
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
-            result.setException(zkce);
-        } catch (InterruptedException ie) {
-            result.setException(new DLInterruptedException("getLastCommitPosition was interrupted", ie));
-        }
-        return result;
-    }
-
-    /**
-     * Advances the position associated with the subscriber
-     *
-     * @param newPosition - new commit position
-     */
-    @Override
-    public Future<BoxedUnit> advanceCommitPosition(DLSN newPosition) {
-        if (null == lastCommittedPosition.get() ||
-            (newPosition.compareTo(lastCommittedPosition.get()) > 0)) {
-            lastCommittedPosition.set(newPosition);
-            return Utils.zkAsyncCreateFullPathOptimisticAndSetData(zooKeeperClient,
-                zkPath, newPosition.serialize().getBytes(Charsets.UTF_8),
-                zooKeeperClient.getDefaultACL(),
-                CreateMode.PERSISTENT);
-        } else {
-            return Future.Done();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java
deleted file mode 100644
index f1e6251..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.subscription;
-
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * ZooKeeper Based Subscriptions Store.
- */
-public class ZKSubscriptionsStore implements SubscriptionsStore {
-
-    private final ZooKeeperClient zkc;
-    private final String zkPath;
-    private final ConcurrentMap<String, ZKSubscriptionStateStore> subscribers =
-            new ConcurrentHashMap<String, ZKSubscriptionStateStore>();
-
-    public ZKSubscriptionsStore(ZooKeeperClient zkc, String zkPath) {
-        this.zkc = zkc;
-        this.zkPath = zkPath;
-    }
-
-    private ZKSubscriptionStateStore getSubscriber(String subscriberId) {
-        ZKSubscriptionStateStore ss = subscribers.get(subscriberId);
-        if (ss == null) {
-            ZKSubscriptionStateStore newSS = new ZKSubscriptionStateStore(zkc,
-                getSubscriberZKPath(subscriberId));
-            ZKSubscriptionStateStore oldSS = subscribers.putIfAbsent(subscriberId, newSS);
-            if (oldSS == null) {
-                ss = newSS;
-            } else {
-                try {
-                    newSS.close();
-                } catch (IOException e) {
-                    // ignore the exception
-                }
-                ss = oldSS;
-            }
-        }
-        return ss;
-    }
-
-    private String getSubscriberZKPath(String subscriberId) {
-        return String.format("%s/%s", zkPath, subscriberId);
-    }
-
-    @Override
-    public Future<DLSN> getLastCommitPosition(String subscriberId) {
-        return getSubscriber(subscriberId).getLastCommitPosition();
-    }
-
-    @Override
-    public Future<Map<String, DLSN>> getLastCommitPositions() {
-        final Promise<Map<String, DLSN>> result = new Promise<Map<String, DLSN>>();
-        try {
-            this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
-                    if (KeeperException.Code.NONODE.intValue() == rc) {
-                        result.setValue(new HashMap<String, DLSN>());
-                    } else if (KeeperException.Code.OK.intValue() != rc) {
-                        result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
-                    } else {
-                        getLastCommitPositions(result, children);
-                    }
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
-            result.setException(zkce);
-        } catch (InterruptedException ie) {
-            result.setException(new DLInterruptedException("getLastCommitPositions was interrupted", ie));
-        }
-        return result;
-    }
-
-    private void getLastCommitPositions(final Promise<Map<String, DLSN>> result,
-                                        List<String> subscribers) {
-        List<Future<Pair<String, DLSN>>> futures =
-                new ArrayList<Future<Pair<String, DLSN>>>(subscribers.size());
-        for (String s : subscribers) {
-            final String subscriber = s;
-            Future<Pair<String, DLSN>> future =
-                // Get the last commit position from zookeeper
-                getSubscriber(subscriber).getLastCommitPositionFromZK().map(
-                        new AbstractFunction1<DLSN, Pair<String, DLSN>>() {
-                            @Override
-                            public Pair<String, DLSN> apply(DLSN dlsn) {
-                                return Pair.of(subscriber, dlsn);
-                            }
-                        });
-            futures.add(future);
-        }
-        Future.collect(futures).foreach(
-            new AbstractFunction1<List<Pair<String, DLSN>>, BoxedUnit>() {
-                @Override
-                public BoxedUnit apply(List<Pair<String, DLSN>> subscriptions) {
-                    Map<String, DLSN> subscriptionMap = new HashMap<String, DLSN>();
-                    for (Pair<String, DLSN> pair : subscriptions) {
-                        subscriptionMap.put(pair.getLeft(), pair.getRight());
-                    }
-                    result.setValue(subscriptionMap);
-                    return BoxedUnit.UNIT;
-                }
-            });
-    }
-
-    @Override
-    public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN newPosition) {
-        return getSubscriber(subscriberId).advanceCommitPosition(newPosition);
-    }
-
-    @Override
-    public Future<Boolean> deleteSubscriber(String subscriberId) {
-        subscribers.remove(subscriberId);
-        String path = getSubscriberZKPath(subscriberId);
-        return Utils.zkDeleteIfNotExist(zkc, path, new ZkVersion(-1));
-    }
-
-    @Override
-    public void close() throws IOException {
-        // no-op
-        for (SubscriptionStateStore store : subscribers.values()) {
-            store.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
index 30d6908..4565921 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
@@ -30,7 +30,6 @@ import java.io.PrintWriter;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Enumeration;
@@ -53,11 +52,16 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.base.Preconditions;
 import com.twitter.distributedlog.BKDistributedLogNamespace;
 import com.twitter.distributedlog.Entry;
+import com.twitter.distributedlog.MetadataAccessor;
 import com.twitter.distributedlog.callback.NamespaceListener;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.distributedlog.namespace.NamespaceDriver;
 import com.twitter.distributedlog.util.Utils;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -100,17 +104,15 @@ import com.twitter.distributedlog.ZooKeeperClientBuilder;
 import com.twitter.distributedlog.auditor.DLAuditor;
 import com.twitter.distributedlog.bk.LedgerAllocator;
 import com.twitter.distributedlog.bk.LedgerAllocatorUtils;
-import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.MetadataUpdater;
 import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
 import com.twitter.distributedlog.util.SchedulerUtils;
 import com.twitter.util.Await;
-import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 
 import static com.google.common.base.Charsets.UTF_8;
 
-@SuppressWarnings("deprecation")
 public class DistributedLogTool extends Tool {
 
     static final Logger logger = LoggerFactory.getLogger(DistributedLogTool.class);
@@ -161,7 +163,7 @@ public class DistributedLogTool extends Tool {
         protected URI uri;
         protected String zkAclId = null;
         protected boolean force = false;
-        protected com.twitter.distributedlog.DistributedLogManagerFactory factory = null;
+        protected DistributedLogNamespace namespace = null;
 
         protected PerDLCommand(String name, String description) {
             super(name, description);
@@ -187,8 +189,8 @@ public class DistributedLogTool extends Tool {
                 return runCmd();
             } finally {
                 synchronized (this) {
-                    if (null != factory) {
-                        factory.close();
+                    if (null != namespace) {
+                        namespace.close();
                     }
                 }
             }
@@ -252,35 +254,33 @@ public class DistributedLogTool extends Tool {
             this.force = force;
         }
 
-        protected synchronized com.twitter.distributedlog.DistributedLogManagerFactory getFactory() throws IOException {
-            if (null == this.factory) {
-                this.factory = new com.twitter.distributedlog.DistributedLogManagerFactory(getConf(), getUri());
-                logger.info("Construct DLM : uri = {}", getUri());
-            }
-            return this.factory;
-        }
-
         protected DistributedLogNamespace getNamespace() throws IOException {
-            return getFactory().getNamespace();
+            if (null == this.namespace) {
+                this.namespace = DistributedLogNamespaceBuilder.newBuilder()
+                        .uri(getUri())
+                        .conf(getConf())
+                        .build();
+            }
+            return this.namespace;
         }
 
         protected LogSegmentMetadataStore getLogSegmentMetadataStore() throws IOException {
-            DistributedLogNamespace namespace = getFactory().getNamespace();
-            assert(namespace instanceof BKDistributedLogNamespace);
-            return ((BKDistributedLogNamespace) namespace).getWriterStreamMetadataStore()
+            return getNamespace()
+                    .getNamespaceDriver()
+                    .getLogStreamMetadataStore(NamespaceDriver.Role.READER)
                     .getLogSegmentMetadataStore();
         }
 
         protected ZooKeeperClient getZooKeeperClient() throws IOException {
-            DistributedLogNamespace namespace = getFactory().getNamespace();
-            assert(namespace instanceof BKDistributedLogNamespace);
-            return ((BKDistributedLogNamespace) namespace).getSharedWriterZKCForDL();
+            NamespaceDriver driver = getNamespace().getNamespaceDriver();
+            assert(driver instanceof BKNamespaceDriver);
+            return ((BKNamespaceDriver) driver).getWriterZKC();
         }
 
         protected BookKeeperClient getBookKeeperClient() throws IOException {
-            DistributedLogNamespace namespace = getFactory().getNamespace();
-            assert(namespace instanceof BKDistributedLogNamespace);
-            return ((BKDistributedLogNamespace) namespace).getReaderBKC();
+            NamespaceDriver driver = getNamespace().getNamespaceDriver();
+            assert(driver instanceof BKNamespaceDriver);
+            return ((BKNamespaceDriver) driver).getReaderBKC();
         }
     }
 
@@ -347,6 +347,10 @@ public class DistributedLogTool extends Tool {
         }
     }
 
+    /**
+     * NOTE: we might consider adding a command to 'delete' namespace. The implementation of the namespace
+     *       driver should implement the 'delete' operation.
+     */
     protected static class DeleteAllocatorPoolCommand extends PerDLCommand {
 
         int concurrency = 1;
@@ -380,8 +384,12 @@ public class DistributedLogTool extends Tool {
             String rootPath = getUri().getPath() + "/" + allocationPoolPath;
             final ScheduledExecutorService allocationExecutor = Executors.newSingleThreadScheduledExecutor();
             ExecutorService executorService = Executors.newFixedThreadPool(concurrency);
+            Preconditions.checkArgument(getNamespace() instanceof BKDistributedLogNamespace);
+            BKDistributedLogNamespace bkns = (BKDistributedLogNamespace) getNamespace();
+            final ZooKeeperClient zkc = ((BKNamespaceDriver) bkns.getNamespaceDriver()).getWriterZKC();
+            final BookKeeperClient bkc = ((BKNamespaceDriver) bkns.getNamespaceDriver()).getReaderBKC();
             try {
-                List<String> pools = getZooKeeperClient().get().getChildren(rootPath, false);
+                List<String> pools = zkc.get().getChildren(rootPath, false);
                 final LinkedBlockingQueue<String> poolsToDelete = new LinkedBlockingQueue<String>();
                 if (getForce() || IOUtils.confirmPrompt("Are you sure you want to delete allocator pools : " + pools)) {
                     for (String pool : pools) {
@@ -401,7 +409,7 @@ public class DistributedLogTool extends Tool {
                                     try {
                                         LedgerAllocator allocator =
                                                 LedgerAllocatorUtils.createLedgerAllocatorPool(poolPath, 0, getConf(),
-                                                        getZooKeeperClient(), getBookKeeperClient(),
+                                                        zkc, bkc,
                                                         allocationExecutor);
                                         allocator.delete();
                                         System.out.println("Deleted allocator pool : " + poolPath + " .");
@@ -454,43 +462,35 @@ public class DistributedLogTool extends Tool {
 
         @Override
         protected int runCmd() throws Exception {
-            if (printMetadata) {
-                printStreamsWithMetadata(getFactory());
-            } else {
-                printStreams(getFactory());
-            }
+            printStreams(getNamespace());
             return 0;
         }
 
-        protected void printStreamsWithMetadata(com.twitter.distributedlog.DistributedLogManagerFactory factory)
-                throws Exception {
-            Map<String, byte[]> streams = factory.enumerateLogsWithMetadataInNamespace();
+        protected void printStreams(DistributedLogNamespace namespace) throws Exception {
+            Iterator<String> streams = namespace.getLogs();
             System.out.println("Streams under " + getUri() + " : ");
             System.out.println("--------------------------------");
-            for (Map.Entry<String, byte[]> entry : streams.entrySet()) {
-                println(entry.getKey());
-                if (null == entry.getValue() || entry.getValue().length == 0) {
+            while (streams.hasNext()) {
+                String streamName = streams.next();
+                System.out.println(streamName);
+                if (!printMetadata) {
+                    continue;
+                }
+                MetadataAccessor accessor =
+                        namespace.getNamespaceDriver().getMetadataAccessor(streamName);
+                byte[] metadata = accessor.getMetadata();
+                if (null == metadata || metadata.length == 0) {
                     continue;
                 }
                 if (printHex) {
-                    System.out.println(Hex.encodeHexString(entry.getValue()));
+                    System.out.println(Hex.encodeHexString(metadata));
                 } else {
-                    System.out.println(new String(entry.getValue(), UTF_8));
+                    System.out.println(new String(metadata, UTF_8));
                 }
                 System.out.println("");
             }
             System.out.println("--------------------------------");
         }
-
-        protected void printStreams(com.twitter.distributedlog.DistributedLogManagerFactory factory) throws Exception {
-            Collection<String> streams = factory.enumerateAllLogsInNamespace();
-            System.out.println("Streams under " + getUri() + " : ");
-            System.out.println("--------------------------------");
-            for (String stream : streams) {
-                System.out.println(stream);
-            }
-            System.out.println("--------------------------------");
-        }
     }
 
     public static class WatchNamespaceCommand extends PerDLCommand implements NamespaceListener {
@@ -609,16 +609,17 @@ public class DistributedLogTool extends Tool {
 
         private void inspectStreams(final SortedMap<String, List<Pair<LogSegmentMetadata, List<String>>>> corruptedCandidates)
                 throws Exception {
-            Collection<String> streamCollection = getFactory().enumerateAllLogsInNamespace();
+            Iterator<String> streamCollection = getNamespace().getLogs();
             final List<String> streams = new ArrayList<String>();
-            if (null != streamPrefix) {
-                for (String s : streamCollection) {
+            while (streamCollection.hasNext()) {
+                String s = streamCollection.next();
+                if (null != streamPrefix) {
                     if (s.startsWith(streamPrefix)) {
                         streams.add(s);
                     }
+                } else {
+                    streams.add(s);
                 }
-            } else {
-                streams.addAll(streamCollection);
             }
             if (0 == streams.size()) {
                 return;
@@ -660,8 +661,7 @@ public class DistributedLogTool extends Tool {
             for (int i = startIdx; i < endIdx; i++) {
                 String s = streams.get(i);
                 BookKeeperClient bkc = getBookKeeperClient();
-                DistributedLogManager dlm =
-                        getFactory().createDistributedLogManagerWithSharedClients(s);
+                DistributedLogManager dlm = getNamespace().openLog(s);
                 try {
                     List<LogSegmentMetadata> segments = dlm.getLogSegments();
                     if (segments.size() <= 1) {
@@ -782,20 +782,21 @@ public class DistributedLogTool extends Tool {
         @Override
         protected int runCmd() throws Exception {
             getConf().setZkAclId(getZkAclId());
-            return truncateStreams(getFactory());
+            return truncateStreams(getNamespace());
         }
 
-        private int truncateStreams(final com.twitter.distributedlog.DistributedLogManagerFactory factory) throws Exception {
-            Collection<String> streamCollection = factory.enumerateAllLogsInNamespace();
+        private int truncateStreams(final DistributedLogNamespace namespace) throws Exception {
+            Iterator<String> streamCollection = namespace.getLogs();
             final List<String> streams = new ArrayList<String>();
-            if (null != streamPrefix) {
-                for (String s : streamCollection) {
+            while (streamCollection.hasNext()) {
+                String s = streamCollection.next();
+                if (null != streamPrefix) {
                     if (s.startsWith(streamPrefix)) {
                         streams.add(s);
                     }
+                } else {
+                    streams.add(s);
                 }
-            } else {
-                streams.addAll(streamCollection);
             }
             if (0 == streams.size()) {
                 return 0;
@@ -813,7 +814,7 @@ public class DistributedLogTool extends Tool {
                     @Override
                     public void run() {
                         try {
-                            truncateStreams(factory, streams, tid, numStreamsPerThreads);
+                            truncateStreams(namespace, streams, tid, numStreamsPerThreads);
                             System.out.println("Thread " + tid + " finished.");
                         } catch (IOException e) {
                             System.err.println("Thread " + tid + " quits with exception : " + e.getMessage());
@@ -828,14 +829,13 @@ public class DistributedLogTool extends Tool {
             return 0;
         }
 
-        private void truncateStreams(com.twitter.distributedlog.DistributedLogManagerFactory factory, List<String> streams,
+        private void truncateStreams(DistributedLogNamespace namespace, List<String> streams,
                                      int tid, int numStreamsPerThreads) throws IOException {
             int startIdx = tid * numStreamsPerThreads;
             int endIdx = Math.min(streams.size(), (tid + 1) * numStreamsPerThreads);
             for (int i = startIdx; i < endIdx; i++) {
                 String s = streams.get(i);
-                DistributedLogManager dlm =
-                        factory.createDistributedLogManagerWithSharedClients(s);
+                DistributedLogManager dlm = namespace.openLog(s);
                 try {
                     if (deleteStream) {
                         dlm.delete();
@@ -930,7 +930,7 @@ public class DistributedLogTool extends Tool {
 
         @Override
         protected int runCmd() throws Exception {
-            DistributedLogManager dlm = getFactory().createDistributedLogManagerWithSharedClients(getStreamName());
+            DistributedLogManager dlm = getNamespace().openLog(getStreamName());
             try {
                 if (listEppStats) {
                     bkc = new SimpleBookKeeperClient(getConf(), getUri());
@@ -1078,7 +1078,7 @@ public class DistributedLogTool extends Tool {
 
         @Override
         protected int runCmd() throws Exception {
-            DistributedLogManager dlm = getFactory().createDistributedLogManagerWithSharedClients(getStreamName());
+            DistributedLogManager dlm = getNamespace().openLog(getStreamName());
             try {
                 long count = 0;
                 if (null == endDLSN) {
@@ -1141,7 +1141,7 @@ public class DistributedLogTool extends Tool {
         @Override
         protected int runCmd() throws Exception {
             getConf().setZkAclId(getZkAclId());
-            DistributedLogManager dlm = getFactory().createDistributedLogManagerWithSharedClients(getStreamName());
+            DistributedLogManager dlm = getNamespace().openLog(getStreamName());
             try {
                 dlm.delete();
             } finally {
@@ -1347,7 +1347,7 @@ public class DistributedLogTool extends Tool {
             }
             getConf().setZkAclId(getZkAclId());
             for (String stream : streams) {
-                getFactory().getNamespace().createLog(stream);
+                getNamespace().createLog(stream);
             }
             return 0;
         }
@@ -1435,7 +1435,7 @@ public class DistributedLogTool extends Tool {
 
         @Override
         protected int runCmd() throws Exception {
-            DistributedLogManager dlm = getFactory().createDistributedLogManagerWithSharedClients(getStreamName());
+            DistributedLogManager dlm = getNamespace().openLog(getStreamName());
             long totalCount = dlm.getLogRecordCount();
             try {
                 AsyncLogReader reader;
@@ -1536,7 +1536,7 @@ public class DistributedLogTool extends Tool {
 
         @Override
         protected int runCmd() throws Exception {
-            DistributedLogManager dlm = getFactory().createDistributedLogManagerWithSharedClients(getStreamName());
+            DistributedLogManager dlm = getNamespace().openLog(getStreamName());
             try {
                 return inspectAndRepair(dlm.getLogSegments());
             } finally {
@@ -2640,11 +2640,11 @@ public class DistributedLogTool extends Tool {
         @Override
         protected int runCmd() throws Exception {
             getConf().setZkAclId(getZkAclId());
-            return truncateStream(getFactory(), getStreamName(), dlsn);
+            return truncateStream(getNamespace(), getStreamName(), dlsn);
         }
 
-        private int truncateStream(final com.twitter.distributedlog.DistributedLogManagerFactory factory, String streamName, DLSN dlsn) throws Exception {
-            DistributedLogManager dlm = factory.createDistributedLogManagerWithSharedClients(streamName);
+        private int truncateStream(final DistributedLogNamespace namespace, String streamName, DLSN dlsn) throws Exception {
+            DistributedLogManager dlm = namespace.openLog(streamName);
             try {
                 long totalRecords = dlm.getLogRecordCount();
                 long recordsAfterTruncate = Await.result(dlm.getLogRecordCountAsync(dlsn));
@@ -2731,7 +2731,6 @@ public class DistributedLogTool extends Tool {
         int numThreads = 1;
         String streamPrefix = null;
         String subscriberId = null;
-        AtomicInteger streamIndex = new AtomicInteger();
 
         DeleteSubscriberCommand() {
             super("delete_subscriber", "Delete the subscriber in subscription store. ");
@@ -2764,20 +2763,21 @@ public class DistributedLogTool extends Tool {
         @Override
         protected int runCmd() throws Exception {
             getConf().setZkAclId(getZkAclId());
-            return deleteSubscriber(getFactory());
+            return deleteSubscriber(getNamespace());
         }
 
-        private int deleteSubscriber(final com.twitter.distributedlog.DistributedLogManagerFactory factory) throws Exception {
-            Collection<String> streamCollection = factory.enumerateAllLogsInNamespace();
+        private int deleteSubscriber(final DistributedLogNamespace namespace) throws Exception {
+            Iterator<String> streamCollection = namespace.getLogs();
             final List<String> streams = new ArrayList<String>();
-            if (null != streamPrefix) {
-                for (String s : streamCollection) {
+            while (streamCollection.hasNext()) {
+                String s = streamCollection.next();
+                if (null != streamPrefix) {
                     if (s.startsWith(streamPrefix)) {
                         streams.add(s);
                     }
+                } else {
+                    streams.add(s);
                 }
-            } else {
-                streams.addAll(streamCollection);
             }
             if (0 == streams.size()) {
                 return 0;
@@ -2796,7 +2796,7 @@ public class DistributedLogTool extends Tool {
                     @Override
                     public void run() {
                         try {
-                            deleteSubscriber(factory, streams, tid, numStreamsPerThreads);
+                            deleteSubscriber(namespace, streams, tid, numStreamsPerThreads);
                             System.out.println("Thread " + tid + " finished.");
                         } catch (Exception e) {
                             System.err.println("Thread " + tid + " quits with exception : " + e.getMessage());
@@ -2811,14 +2811,13 @@ public class DistributedLogTool extends Tool {
             return 0;
         }
 
-        private void deleteSubscriber(com.twitter.distributedlog.DistributedLogManagerFactory factory, List<String> streams,
+        private void deleteSubscriber(DistributedLogNamespace namespace, List<String> streams,
                                       int tid, int numStreamsPerThreads) throws Exception {
             int startIdx = tid * numStreamsPerThreads;
             int endIdx = Math.min(streams.size(), (tid + 1) * numStreamsPerThreads);
             for (int i = startIdx; i < endIdx; i++) {
                 final String s = streams.get(i);
-                DistributedLogManager dlm =
-                    factory.createDistributedLogManagerWithSharedClients(s);
+                DistributedLogManager dlm = namespace.openLog(s);
                 final CountDownLatch countDownLatch = new CountDownLatch(1);
                 dlm.getSubscriptionsStore().deleteSubscriber(subscriberId)
                     .addEventListener(new FutureEventListener<Boolean>() {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
index 63db1fe..2f9e091 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
@@ -17,34 +17,27 @@
  */
 package com.twitter.distributedlog.util;
 
+import com.google.common.base.Objects;
+import com.twitter.distributedlog.DistributedLogConstants;
 import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.StringUtils;
 
+import java.net.InetAddress;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.List;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * Utilities about DL implementations like uri, log segments, metadata serialization and deserialization.
  */
 public class DLUtils {
 
-    static final Logger logger = LoggerFactory.getLogger(DLUtils.class);
-
-    /**
-     * Extract zk servers fro dl <i>uri</i>.
-     *
-     * @param uri
-     *          dl uri
-     * @return zk servers
-     */
-    public static String getZKServersFromDLUri(URI uri) {
-        return uri.getAuthority().replace(";", ",");
-    }
-
     /**
      * Find the log segment whose transaction ids are not less than provided <code>transactionId</code>.
      *
@@ -224,4 +217,105 @@ public class DLUtils {
     public static long bytes2LogSegmentId(byte[] data) {
         return Long.parseLong(new String(data, UTF_8));
     }
+
+    /**
+     * Normalize the uri.
+     *
+     * @param uri the distributedlog uri.
+     * @return the normalized uri
+     */
+    public static URI normalizeURI(URI uri) {
+        checkNotNull(uri, "DistributedLog uri is null");
+        String scheme = uri.getScheme();
+        checkNotNull(scheme, "Invalid distributedlog uri : " + uri);
+        scheme = scheme.toLowerCase();
+        String[] schemeParts = StringUtils.split(scheme, '-');
+        checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()),
+                "Unknown distributedlog scheme found : " + uri);
+        URI normalizedUri;
+        try {
+            normalizedUri = new URI(
+                    schemeParts[0],     // remove backend info
+                    uri.getAuthority(),
+                    uri.getPath(),
+                    uri.getQuery(),
+                    uri.getFragment());
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Invalid distributedlog uri found : " + uri, e);
+        }
+        return normalizedUri;
+    }
+
+    private static String getHostIpLockClientId() {
+        try {
+            return InetAddress.getLocalHost().toString();
+        } catch(Exception ex) {
+            return DistributedLogConstants.UNKNOWN_CLIENT_ID;
+        }
+    }
+
+    /**
+     * Normalize the client id.
+     *
+     * @return the normalized client id.
+     */
+    public static String normalizeClientId(String clientId) {
+        String normalizedClientId;
+        if (clientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) {
+            normalizedClientId = getHostIpLockClientId();
+        } else {
+            normalizedClientId = clientId;
+        }
+        return normalizedClientId;
+    }
+
+    /**
+     * Is it a reserved stream name in bkdl namespace?
+     *
+     * @param name
+     *          stream name
+     * @return true if it is reserved name, otherwise false.
+     */
+    public static boolean isReservedStreamName(String name) {
+        return name.startsWith(".");
+    }
+
+    /**
+     * Validate the stream name.
+     *
+     * @param nameOfStream
+     *          name of stream
+     * @throws InvalidStreamNameException
+     */
+    public static void validateName(String nameOfStream)
+            throws InvalidStreamNameException {
+        String reason = null;
+        char chars[] = nameOfStream.toCharArray();
+        char c;
+        // validate the stream to see if meet zookeeper path's requirement
+        for (int i = 0; i < chars.length; i++) {
+            c = chars[i];
+
+            if (c == 0) {
+                reason = "null character not allowed @" + i;
+                break;
+            } else if (c == '/') {
+                reason = "'/' not allowed @" + i;
+                break;
+            } else if (c > '\u0000' && c < '\u001f'
+                    || c > '\u007f' && c < '\u009F'
+                    || c > '\ud800' && c < '\uf8ff'
+                    || c > '\ufff0' && c < '\uffff') {
+                reason = "invalid charater @" + i;
+                break;
+            }
+        }
+        if (null != reason) {
+            throw new InvalidStreamNameException(nameOfStream, reason);
+        }
+        if (isReservedStreamName(nameOfStream)) {
+            throw new InvalidStreamNameException(nameOfStream,
+                    "Stream Name is reserved");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
index 266409e..f206a25 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
@@ -271,7 +271,7 @@ public class FutureUtils {
      *
      * @param throwable the cause of the exception
      * @return the bk exception return code. if the exception isn't bk exceptions,
-     *         it would return bk exception code.
+     *         it would return {@link BKException.Code#UnexpectedConditionException}.
      */
     public static int bkResultCode(Throwable throwable) {
         if (throwable instanceof BKException) {
@@ -455,13 +455,13 @@ public class FutureUtils {
      * @param key submit key of the ordered scheduler
      */
     public static <T> void setException(final Promise<T> promise,
-                                        final Throwable throwable,
+                                        final Throwable cause,
                                         OrderedScheduler scheduler,
                                         Object key) {
         scheduler.submit(key, new Runnable() {
             @Override
             public void run() {
-                setException(promise, throwable);
+                setException(promise, cause);
             }
         });
     }