You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:41 UTC

[27/31] incubator-distributedlog git commit: DL-163: clean up direct zookeeper and bookkeeper usage and use metadata/data store abstraction

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControl.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControl.java
deleted file mode 100644
index bf16256..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControl.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.acl;
-
-import com.google.common.base.Objects;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.thrift.AccessControlEntry;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.apache.thrift.transport.TMemoryBuffer;
-import org.apache.thrift.transport.TMemoryInputTransport;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-public class ZKAccessControl {
-
-    private static final int BUFFER_SIZE = 4096;
-
-    public static final AccessControlEntry DEFAULT_ACCESS_CONTROL_ENTRY = new AccessControlEntry();
-
-    public static class CorruptedAccessControlException extends IOException {
-
-        private static final long serialVersionUID = 5391285182476211603L;
-
-        public CorruptedAccessControlException(String zkPath, Throwable t) {
-            super("Access Control @ " + zkPath + " is corrupted.", t);
-        }
-    }
-
-    protected final AccessControlEntry accessControlEntry;
-    protected final String zkPath;
-    private int zkVersion;
-
-    public ZKAccessControl(AccessControlEntry ace, String zkPath) {
-        this(ace, zkPath, -1);
-    }
-
-    private ZKAccessControl(AccessControlEntry ace, String zkPath, int zkVersion) {
-        this.accessControlEntry = ace;
-        this.zkPath = zkPath;
-        this.zkVersion = zkVersion;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(zkPath, accessControlEntry);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (!(obj instanceof ZKAccessControl)) {
-            return false;
-        }
-        ZKAccessControl other = (ZKAccessControl) obj;
-        return Objects.equal(zkPath, other.zkPath) &&
-                Objects.equal(accessControlEntry, other.accessControlEntry);
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("entry(path=").append(zkPath).append(", acl=")
-                .append(accessControlEntry).append(")");
-        return sb.toString();
-    }
-
-    public String getZKPath() {
-        return zkPath;
-    }
-
-    public AccessControlEntry getAccessControlEntry() {
-        return accessControlEntry;
-    }
-
-    public Future<ZKAccessControl> create(ZooKeeperClient zkc) {
-        final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
-        try {
-            zkc.get().create(zkPath, serialize(accessControlEntry), zkc.getDefaultACL(), CreateMode.PERSISTENT,
-                    new AsyncCallback.StringCallback() {
-                        @Override
-                        public void processResult(int rc, String path, Object ctx, String name) {
-                            if (KeeperException.Code.OK.intValue() == rc) {
-                                ZKAccessControl.this.zkVersion = 0;
-                                promise.setValue(ZKAccessControl.this);
-                            } else {
-                                promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
-                            }
-                        }
-                    }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        } catch (InterruptedException e) {
-            promise.setException(e);
-        } catch (IOException e) {
-            promise.setException(e);
-        }
-        return promise;
-    }
-
-    public Future<ZKAccessControl> update(ZooKeeperClient zkc) {
-        final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
-        try {
-            zkc.get().setData(zkPath, serialize(accessControlEntry), zkVersion, new AsyncCallback.StatCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx, Stat stat) {
-                    if (KeeperException.Code.OK.intValue() == rc) {
-                        ZKAccessControl.this.zkVersion = stat.getVersion();
-                        promise.setValue(ZKAccessControl.this);
-                    } else {
-                        promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
-                    }
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        } catch (InterruptedException e) {
-            promise.setException(e);
-        } catch (IOException e) {
-            promise.setException(e);
-        }
-        return promise;
-    }
-
-    public static Future<ZKAccessControl> read(final ZooKeeperClient zkc, final String zkPath, Watcher watcher) {
-        final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
-
-        try {
-            zkc.get().getData(zkPath, watcher, new AsyncCallback.DataCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-                    if (KeeperException.Code.OK.intValue() == rc) {
-                        try {
-                            AccessControlEntry ace = deserialize(zkPath, data);
-                            promise.setValue(new ZKAccessControl(ace, zkPath, stat.getVersion()));
-                        } catch (IOException ioe) {
-                            promise.setException(ioe);
-                        }
-                    } else {
-                        promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
-                    }
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        } catch (InterruptedException e) {
-            promise.setException(e);
-        }
-        return promise;
-    }
-
-    public static Future<Void> delete(final ZooKeeperClient zkc, final String zkPath) {
-        final Promise<Void> promise = new Promise<Void>();
-
-        try {
-            zkc.get().delete(zkPath, -1, new AsyncCallback.VoidCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx) {
-                    if (KeeperException.Code.OK.intValue() == rc ||
-                            KeeperException.Code.NONODE.intValue() == rc) {
-                        promise.setValue(null);
-                    } else {
-                        promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
-                    }
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        } catch (InterruptedException e) {
-            promise.setException(e);
-        }
-        return promise;
-    }
-
-    static byte[] serialize(AccessControlEntry ace) throws IOException {
-        TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
-        TJSONProtocol protocol = new TJSONProtocol(transport);
-        try {
-            ace.write(protocol);
-            transport.flush();
-            return transport.toString(UTF_8.name()).getBytes(UTF_8);
-        } catch (TException e) {
-            throw new IOException("Failed to serialize access control entry : ", e);
-        } catch (UnsupportedEncodingException uee) {
-            throw new IOException("Failed to serialize acesss control entry : ", uee);
-        }
-    }
-
-    static AccessControlEntry deserialize(String zkPath, byte[] data) throws IOException {
-        if (data.length == 0) {
-            return DEFAULT_ACCESS_CONTROL_ENTRY;
-        }
-
-        AccessControlEntry ace = new AccessControlEntry();
-        TMemoryInputTransport transport = new TMemoryInputTransport(data);
-        TJSONProtocol protocol = new TJSONProtocol(transport);
-        try {
-            ace.read(protocol);
-        } catch (TException e) {
-            throw new CorruptedAccessControlException(zkPath, e);
-        }
-        return ace;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControlManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControlManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControlManager.java
deleted file mode 100644
index 9c89b4a..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControlManager.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.acl;
-
-import com.google.common.collect.Sets;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.thrift.AccessControlEntry;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * ZooKeeper Based {@link com.twitter.distributedlog.acl.AccessControlManager}
- */
-public class ZKAccessControlManager implements AccessControlManager, Watcher {
-
-    private static final Logger logger = LoggerFactory.getLogger(ZKAccessControlManager.class);
-
-    private static final int ZK_RETRY_BACKOFF_MS = 500;
-
-    protected final DistributedLogConfiguration conf;
-    protected final ZooKeeperClient zkc;
-    protected final String zkRootPath;
-    protected final ScheduledExecutorService scheduledExecutorService;
-
-    protected final ConcurrentMap<String, ZKAccessControl> streamEntries;
-    protected ZKAccessControl defaultAccessControl;
-    protected volatile boolean closed = false;
-
-    public ZKAccessControlManager(DistributedLogConfiguration conf,
-                                  ZooKeeperClient zkc,
-                                  String zkRootPath,
-                                  ScheduledExecutorService scheduledExecutorService) throws IOException {
-        this.conf = conf;
-        this.zkc = zkc;
-        this.zkRootPath = zkRootPath;
-        this.scheduledExecutorService = scheduledExecutorService;
-        this.streamEntries = new ConcurrentHashMap<String, ZKAccessControl>();
-        try {
-            Await.result(fetchDefaultAccessControlEntry());
-        } catch (Throwable t) {
-            if (t instanceof InterruptedException) {
-                throw new DLInterruptedException("Interrupted on getting default access control entry for " + zkRootPath, t);
-            } else if (t instanceof KeeperException) {
-                throw new IOException("Encountered zookeeper exception on getting default access control entry for " + zkRootPath, t);
-            } else if (t instanceof IOException) {
-                throw (IOException) t;
-            } else {
-                throw new IOException("Encountered unknown exception on getting access control entries for " + zkRootPath, t);
-            }
-        }
-
-        try {
-            Await.result(fetchAccessControlEntries());
-        } catch (Throwable t) {
-            if (t instanceof InterruptedException) {
-                throw new DLInterruptedException("Interrupted on getting access control entries for " + zkRootPath, t);
-            } else if (t instanceof KeeperException) {
-                throw new IOException("Encountered zookeeper exception on getting access control entries for " + zkRootPath, t);
-            } else if (t instanceof IOException) {
-                throw (IOException) t;
-            } else {
-                throw new IOException("Encountered unknown exception on getting access control entries for " + zkRootPath, t);
-            }
-        }
-    }
-
-    protected AccessControlEntry getAccessControlEntry(String stream) {
-        ZKAccessControl entry = streamEntries.get(stream);
-        entry = null == entry ? defaultAccessControl : entry;
-        return entry.getAccessControlEntry();
-    }
-
-    @Override
-    public boolean allowWrite(String stream) {
-        return !getAccessControlEntry(stream).isDenyWrite();
-    }
-
-    @Override
-    public boolean allowTruncate(String stream) {
-        return !getAccessControlEntry(stream).isDenyTruncate();
-    }
-
-    @Override
-    public boolean allowDelete(String stream) {
-        return !getAccessControlEntry(stream).isDenyDelete();
-    }
-
-    @Override
-    public boolean allowAcquire(String stream) {
-        return !getAccessControlEntry(stream).isDenyAcquire();
-    }
-
-    @Override
-    public boolean allowRelease(String stream) {
-        return !getAccessControlEntry(stream).isDenyRelease();
-    }
-
-    @Override
-    public void close() {
-        closed = true;
-    }
-
-    private Future<Void> fetchAccessControlEntries() {
-        final Promise<Void> promise = new Promise<Void>();
-        fetchAccessControlEntries(promise);
-        return promise;
-    }
-
-    private void fetchAccessControlEntries(final Promise<Void> promise) {
-        try {
-            zkc.get().getChildren(zkRootPath, this, new AsyncCallback.Children2Callback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
-                    if (KeeperException.Code.OK.intValue() != rc) {
-                        promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
-                        return;
-                    }
-                    Set<String> streamsReceived = new HashSet<String>();
-                    streamsReceived.addAll(children);
-                    Set<String> streamsCached = streamEntries.keySet();
-                    Set<String> streamsRemoved = Sets.difference(streamsCached, streamsReceived).immutableCopy();
-                    for (String s : streamsRemoved) {
-                        ZKAccessControl accessControl = streamEntries.remove(s);
-                        if (null != accessControl) {
-                            logger.info("Removed Access Control Entry for stream {} : {}", s, accessControl.getAccessControlEntry());
-                        }
-                    }
-                    if (streamsReceived.isEmpty()) {
-                        promise.setValue(null);
-                        return;
-                    }
-                    final AtomicInteger numPendings = new AtomicInteger(streamsReceived.size());
-                    final AtomicInteger numFailures = new AtomicInteger(0);
-                    for (String s : streamsReceived) {
-                        final String streamName = s;
-                        ZKAccessControl.read(zkc, zkRootPath + "/" + streamName, null)
-                                .addEventListener(new FutureEventListener<ZKAccessControl>() {
-
-                                    @Override
-                                    public void onSuccess(ZKAccessControl accessControl) {
-                                        streamEntries.put(streamName, accessControl);
-                                        logger.info("Added overrided access control for stream {} : {}", streamName, accessControl.getAccessControlEntry());
-                                        complete();
-                                    }
-
-                                    @Override
-                                    public void onFailure(Throwable cause) {
-                                        if (cause instanceof KeeperException.NoNodeException) {
-                                            streamEntries.remove(streamName);
-                                        } else if (cause instanceof ZKAccessControl.CorruptedAccessControlException) {
-                                            logger.warn("Access control is corrupted for stream {} @ {}, skipped it ...",
-                                                        new Object[] { streamName, zkRootPath, cause });
-                                            streamEntries.remove(streamName);
-                                        } else {
-                                            if (1 == numFailures.incrementAndGet()) {
-                                                promise.setException(cause);
-                                            }
-                                        }
-                                        complete();
-                                    }
-
-                                    private void complete() {
-                                        if (0 == numPendings.decrementAndGet() && numFailures.get() == 0) {
-                                            promise.setValue(null);
-                                        }
-                                    }
-                                });
-                    }
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        } catch (InterruptedException e) {
-            promise.setException(e);
-        }
-    }
-
-    private Future<ZKAccessControl> fetchDefaultAccessControlEntry() {
-        final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
-        fetchDefaultAccessControlEntry(promise);
-        return promise;
-    }
-
-    private void fetchDefaultAccessControlEntry(final Promise<ZKAccessControl> promise) {
-        ZKAccessControl.read(zkc, zkRootPath, this)
-            .addEventListener(new FutureEventListener<ZKAccessControl>() {
-                @Override
-                public void onSuccess(ZKAccessControl accessControl) {
-                    logger.info("Default Access Control will be changed from {} to {}",
-                                ZKAccessControlManager.this.defaultAccessControl,
-                                accessControl);
-                    ZKAccessControlManager.this.defaultAccessControl = accessControl;
-                    promise.setValue(accessControl);
-                }
-
-                @Override
-                public void onFailure(Throwable cause) {
-                    if (cause instanceof KeeperException.NoNodeException) {
-                        logger.info("Default Access Control is missing, creating one for {} ...", zkRootPath);
-                        createDefaultAccessControlEntryIfNeeded(promise);
-                    } else {
-                        promise.setException(cause);
-                    }
-                }
-            });
-    }
-
-    private void createDefaultAccessControlEntryIfNeeded(final Promise<ZKAccessControl> promise) {
-        ZooKeeper zk;
-        try {
-            zk = zkc.get();
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-            return;
-        } catch (InterruptedException e) {
-            promise.setException(e);
-            return;
-        }
-        ZkUtils.asyncCreateFullPathOptimistic(zk, zkRootPath, new byte[0], zkc.getDefaultACL(),
-                CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx, String name) {
-                if (KeeperException.Code.OK.intValue() == rc) {
-                    logger.info("Created zk path {} for default ACL.", zkRootPath);
-                    fetchDefaultAccessControlEntry(promise);
-                } else {
-                    promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
-                }
-            }
-        }, null);
-    }
-
-    private void refetchDefaultAccessControlEntry(final int delayMs) {
-        if (closed) {
-            return;
-        }
-        scheduledExecutorService.schedule(new Runnable() {
-            @Override
-            public void run() {
-                fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() {
-                    @Override
-                    public void onSuccess(ZKAccessControl value) {
-                        // no-op
-                    }
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        if (cause instanceof ZKAccessControl.CorruptedAccessControlException) {
-                            logger.warn("Default access control entry is corrupted, ignore this update : ", cause);
-                            return;
-                        }
-
-                        logger.warn("Encountered an error on refetching default access control entry, retrying in {} ms : ",
-                                    ZK_RETRY_BACKOFF_MS, cause);
-                        refetchDefaultAccessControlEntry(ZK_RETRY_BACKOFF_MS);
-                    }
-                });
-            }
-        }, delayMs, TimeUnit.MILLISECONDS);
-    }
-
-    private void refetchAccessControlEntries(final int delayMs) {
-        if (closed) {
-            return;
-        }
-        scheduledExecutorService.schedule(new Runnable() {
-            @Override
-            public void run() {
-                fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() {
-                    @Override
-                    public void onSuccess(Void value) {
-                        // no-op
-                    }
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        logger.warn("Encountered an error on refetching access control entries, retrying in {} ms : ",
-                                    ZK_RETRY_BACKOFF_MS, cause);
-                        refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS);
-                    }
-                });
-            }
-        }, delayMs, TimeUnit.MILLISECONDS);
-    }
-
-    private void refetchAllAccessControlEntries(final int delayMs) {
-        if (closed) {
-            return;
-        }
-        scheduledExecutorService.schedule(new Runnable() {
-            @Override
-            public void run() {
-                fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() {
-                    @Override
-                    public void onSuccess(ZKAccessControl value) {
-                        fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() {
-                            @Override
-                            public void onSuccess(Void value) {
-                                // no-op
-                            }
-
-                            @Override
-                            public void onFailure(Throwable cause) {
-                                logger.warn("Encountered an error on fetching all access control entries, retrying in {} ms : ",
-                                            ZK_RETRY_BACKOFF_MS, cause);
-                                refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS);
-                            }
-                        });
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        logger.warn("Encountered an error on refetching all access control entries, retrying in {} ms : ",
-                                    ZK_RETRY_BACKOFF_MS, cause);
-                        refetchAllAccessControlEntries(ZK_RETRY_BACKOFF_MS);
-                    }
-                });
-            }
-        }, delayMs, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-        if (Event.EventType.None.equals(event.getType())) {
-            if (event.getState() == Event.KeeperState.Expired) {
-                refetchAllAccessControlEntries(0);
-            }
-        } else if (Event.EventType.NodeDataChanged.equals(event.getType())) {
-            logger.info("Default ACL for {} is changed, refetching ...", zkRootPath);
-            refetchDefaultAccessControlEntry(0);
-        } else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) {
-            logger.info("List of ACLs for {} are changed, refetching ...", zkRootPath);
-            refetchAccessControlEntries(0);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
index 0a3fdb0..0512907 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
@@ -18,28 +18,27 @@
 package com.twitter.distributedlog.admin;
 
 import com.google.common.base.Preconditions;
-import com.twitter.distributedlog.BookKeeperClient;
-import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.google.common.collect.Lists;
 import com.twitter.distributedlog.DistributedLogManager;
 import com.twitter.distributedlog.LogRecordWithDLSN;
 import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.ReadUtils;
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.ZooKeeperClientBuilder;
-import com.twitter.distributedlog.acl.ZKAccessControl;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
+import com.twitter.distributedlog.impl.acl.ZKAccessControl;
 import com.twitter.distributedlog.exceptions.DLIllegalStateException;
 import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
-import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
-import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
-import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.DLMetadata;
 import com.twitter.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
 import com.twitter.distributedlog.metadata.MetadataUpdater;
 import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.namespace.NamespaceDriver;
 import com.twitter.distributedlog.thrift.AccessControlEntry;
 import com.twitter.distributedlog.tools.DistributedLogTool;
-import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.SchedulerUtils;
@@ -61,6 +60,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
@@ -76,7 +76,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 /**
  * Admin Tool for DistributedLog.
  */
-@SuppressWarnings("deprecation")
 public class DistributedLogAdmin extends DistributedLogTool {
 
     static final Logger LOG = LoggerFactory.getLogger(DistributedLogAdmin.class);
@@ -84,8 +83,8 @@ public class DistributedLogAdmin extends DistributedLogTool {
     /**
      * Fix inprogress segment with lower ledger sequence number.
      *
-     * @param factory
-     *          dlm factory.
+     * @param namespace
+     *          dl namespace
      * @param metadataUpdater
      *          metadata updater.
      * @param streamName
@@ -96,12 +95,12 @@ public class DistributedLogAdmin extends DistributedLogTool {
      *          is confirmation needed before executing actual action.
      * @throws IOException
      */
-    public static void fixInprogressSegmentWithLowerSequenceNumber(final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+    public static void fixInprogressSegmentWithLowerSequenceNumber(final DistributedLogNamespace namespace,
                                                                    final MetadataUpdater metadataUpdater,
                                                                    final String streamName,
                                                                    final boolean verbose,
                                                                    final boolean interactive) throws IOException {
-        DistributedLogManager dlm = factory.createDistributedLogManagerWithSharedClients(streamName);
+        DistributedLogManager dlm = namespace.openLog(streamName);
         try {
             List<LogSegmentMetadata> segments = dlm.getLogSegments();
             if (verbose) {
@@ -194,37 +193,37 @@ public class DistributedLogAdmin extends DistributedLogTool {
     }
 
     public static void checkAndRepairDLNamespace(final URI uri,
-                                                 final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+                                                 final DistributedLogNamespace namespace,
                                                  final MetadataUpdater metadataUpdater,
                                                  final OrderedScheduler scheduler,
-                                                 final BookKeeperClient bkc,
-                                                 final String digestpw,
                                                  final boolean verbose,
                                                  final boolean interactive) throws IOException {
-        checkAndRepairDLNamespace(uri, factory, metadataUpdater, scheduler, bkc, digestpw, verbose, interactive, 1);
+        checkAndRepairDLNamespace(uri, namespace, metadataUpdater, scheduler, verbose, interactive, 1);
     }
 
     public static void checkAndRepairDLNamespace(final URI uri,
-                                                 final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+                                                 final DistributedLogNamespace namespace,
                                                  final MetadataUpdater metadataUpdater,
                                                  final OrderedScheduler scheduler,
-                                                 final BookKeeperClient bkc,
-                                                 final String digestpw,
                                                  final boolean verbose,
                                                  final boolean interactive,
                                                  final int concurrency) throws IOException {
         Preconditions.checkArgument(concurrency > 0, "Invalid concurrency " + concurrency + " found.");
         // 0. getting streams under a given uri.
-        Collection<String> streams = factory.enumerateAllLogsInNamespace();
+        Iterator<String> streamsIter = namespace.getLogs();
+        List<String> streams = Lists.newArrayList();
+        while (streamsIter.hasNext()) {
+            streams.add(streamsIter.next());
+        }
         if (verbose) {
-            System.out.println("- 0. checking " + streams.size() + " streams under " + uri);
+            System.out.println("- 0. checking streams under " + uri);
         }
         if (streams.size() == 0) {
             System.out.println("+ 0. nothing to check. quit.");
             return;
         }
         Map<String, StreamCandidate> streamCandidates =
-                checkStreams(factory, streams, scheduler, bkc, digestpw, concurrency);
+                checkStreams(namespace, streams, scheduler, concurrency);
         if (verbose) {
             System.out.println("+ 0. " + streamCandidates.size() + " corrupted streams found.");
         }
@@ -248,11 +247,9 @@ public class DistributedLogAdmin extends DistributedLogTool {
     }
 
     private static Map<String, StreamCandidate> checkStreams(
-            final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+            final DistributedLogNamespace namespace,
             final Collection<String> streams,
             final OrderedScheduler scheduler,
-            final BookKeeperClient bkc,
-            final String digestpw,
             final int concurrency) throws IOException {
         final LinkedBlockingQueue<String> streamQueue =
                 new LinkedBlockingQueue<String>();
@@ -275,7 +272,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
                     StreamCandidate candidate;
                     try {
                         LOG.info("Checking stream {}.", stream);
-                        candidate = checkStream(factory, stream, scheduler, bkc, digestpw);
+                        candidate = checkStream(namespace, stream, scheduler);
                         LOG.info("Checked stream {} - {}.", stream, candidate);
                     } catch (IOException e) {
                         LOG.error("Error on checking stream {} : ", stream, e);
@@ -316,12 +313,10 @@ public class DistributedLogAdmin extends DistributedLogTool {
     }
 
     private static StreamCandidate checkStream(
-            final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+            final DistributedLogNamespace namespace,
             final String streamName,
-            final OrderedScheduler scheduler,
-            final BookKeeperClient bkc,
-            String digestpw) throws IOException {
-        DistributedLogManager dlm = factory.createDistributedLogManagerWithSharedClients(streamName);
+            final OrderedScheduler scheduler) throws IOException {
+        DistributedLogManager dlm = namespace.openLog(streamName);
         try {
             List<LogSegmentMetadata> segments = dlm.getLogSegments();
             if (segments.isEmpty()) {
@@ -330,7 +325,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
             List<Future<LogSegmentCandidate>> futures =
                     new ArrayList<Future<LogSegmentCandidate>>(segments.size());
             for (LogSegmentMetadata segment : segments) {
-                futures.add(checkLogSegment(streamName, segment, scheduler, bkc, digestpw));
+                futures.add(checkLogSegment(namespace, streamName, segment, scheduler));
             }
             List<LogSegmentCandidate> segmentCandidates;
             try {
@@ -354,21 +349,16 @@ public class DistributedLogAdmin extends DistributedLogTool {
     }
 
     private static Future<LogSegmentCandidate> checkLogSegment(
+            final DistributedLogNamespace namespace,
             final String streamName,
             final LogSegmentMetadata metadata,
-            final OrderedScheduler scheduler,
-            final BookKeeperClient bkc,
-            final String digestpw) {
+            final OrderedScheduler scheduler) {
         if (metadata.isInProgress()) {
             return Future.value(null);
         }
 
-        final LogSegmentEntryStore entryStore = new BKLogSegmentEntryStore(
-                new DistributedLogConfiguration().setBKDigestPW(digestpw),
-                bkc,
-                scheduler,
-                NullStatsLogger.INSTANCE,
-                AsyncFailureInjector.NULL);
+        final LogSegmentEntryStore entryStore = namespace.getNamespaceDriver()
+                .getLogSegmentEntryStore(NamespaceDriver.Role.READER);
         return ReadUtils.asyncReadLastRecord(
                 streamName,
                 metadata,
@@ -432,6 +422,8 @@ public class DistributedLogAdmin extends DistributedLogTool {
 
     /**
      * Unbind the bookkeeper environment for a given distributedlog uri.
+     *
+     * TODO: move unbind operation to namespace driver
      */
     class UnbindCommand extends OptsCommand {
 
@@ -491,6 +483,8 @@ public class DistributedLogAdmin extends DistributedLogTool {
 
     /**
      * Bind Command to bind bookkeeper environment for a given distributed uri.
+     *
+     * TODO: move bind to namespace driver
      */
     class BindCommand extends OptsCommand {
 
@@ -559,7 +553,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
             if (cmdline.hasOption("dlzw")) {
                 dlZkServersForWriter = cmdline.getOptionValue("dlzw");
             } else {
-                dlZkServersForWriter = DLUtils.getZKServersFromDLUri(uri);
+                dlZkServersForWriter = BKNamespaceDriver.getZKServersFromDLUri(uri);
             }
             if (cmdline.hasOption("dlzr")) {
                 dlZkServersForReader = cmdline.getOptionValue("dlzr");
@@ -689,7 +683,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
                 return -1;
             }
             for (String stream : streams) {
-                fixInprogressSegmentWithLowerSequenceNumber(getFactory(), metadataUpdater, stream, verbose, !getForce());
+                fixInprogressSegmentWithLowerSequenceNumber(getNamespace(), metadataUpdater, stream, verbose, !getForce());
             }
             return 0;
         }
@@ -739,10 +733,9 @@ public class DistributedLogAdmin extends DistributedLogTool {
                     .corePoolSize(Runtime.getRuntime().availableProcessors())
                     .build();
             ExecutorService executorService = Executors.newCachedThreadPool();
-            BookKeeperClient bkc = getBookKeeperClient();
             try {
-                checkAndRepairDLNamespace(getUri(), getFactory(), metadataUpdater, scheduler,
-                                          bkc, getConf().getBKDigestPW(), verbose, !getForce(), concurrency);
+                checkAndRepairDLNamespace(getUri(), getNamespace(), metadataUpdater, scheduler,
+                                          verbose, !getForce(), concurrency);
             } finally {
                 SchedulerUtils.shutdownScheduler(executorService, 5, TimeUnit.MINUTES);
             }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java
index e551c22..a081606 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java
@@ -21,18 +21,20 @@ import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
-import com.twitter.distributedlog.BKDistributedLogNamespace;
 import com.twitter.distributedlog.BookKeeperClient;
 import com.twitter.distributedlog.BookKeeperClientBuilder;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.DistributedLogManager;
 import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.ZooKeeperClientBuilder;
 import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
+import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.distributedlog.namespace.NamespaceDriver;
 import com.twitter.distributedlog.util.DLUtils;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -52,8 +54,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -72,7 +74,6 @@ import static com.google.common.base.Charsets.UTF_8;
 /**
  * DL Auditor will audit DL namespace, e.g. find leaked ledger, report disk usage by streams.
  */
-@SuppressWarnings("deprecation")
 public class DLAuditor {
 
     private static final Logger logger = LoggerFactory.getLogger(DLAuditor.class);
@@ -83,23 +84,23 @@ public class DLAuditor {
         this.conf = conf;
     }
 
-    private ZooKeeperClient getZooKeeperClient(com.twitter.distributedlog.DistributedLogManagerFactory factory) {
-        DistributedLogNamespace namespace = factory.getNamespace();
-        assert(namespace instanceof BKDistributedLogNamespace);
-        return ((BKDistributedLogNamespace) namespace).getSharedWriterZKCForDL();
+    private ZooKeeperClient getZooKeeperClient(DistributedLogNamespace namespace) {
+        NamespaceDriver driver = namespace.getNamespaceDriver();
+        assert(driver instanceof BKNamespaceDriver);
+        return ((BKNamespaceDriver) driver).getWriterZKC();
     }
 
-    private BookKeeperClient getBookKeeperClient(com.twitter.distributedlog.DistributedLogManagerFactory factory) {
-        DistributedLogNamespace namespace = factory.getNamespace();
-        assert(namespace instanceof BKDistributedLogNamespace);
-        return ((BKDistributedLogNamespace) namespace).getReaderBKC();
+    private BookKeeperClient getBookKeeperClient(DistributedLogNamespace namespace) {
+        NamespaceDriver driver = namespace.getNamespaceDriver();
+        assert(driver instanceof BKNamespaceDriver);
+        return ((BKNamespaceDriver) driver).getReaderBKC();
     }
 
     private String validateAndGetZKServers(List<URI> uris) {
         URI firstURI = uris.get(0);
-        String zkServers = DLUtils.getZKServersFromDLUri(firstURI);
+        String zkServers = BKNamespaceDriver.getZKServersFromDLUri(firstURI);
         for (URI uri : uris) {
-            if (!zkServers.equalsIgnoreCase(DLUtils.getZKServersFromDLUri(uri))) {
+            if (!zkServers.equalsIgnoreCase(BKNamespaceDriver.getZKServersFromDLUri(uri))) {
                 throw new IllegalArgumentException("Uris don't belong to same zookeeper cluster");
             }
         }
@@ -224,19 +225,23 @@ public class DLAuditor {
     private Set<Long> collectLedgersFromDL(List<URI> uris, List<List<String>> allocationPaths)
             throws IOException {
         final Set<Long> ledgers = new TreeSet<Long>();
-        List<com.twitter.distributedlog.DistributedLogManagerFactory> factories =
-                new ArrayList<com.twitter.distributedlog.DistributedLogManagerFactory>(uris.size());
+        List<DistributedLogNamespace> namespaces =
+                new ArrayList<DistributedLogNamespace>(uris.size());
         try {
             for (URI uri : uris) {
-                factories.add(new com.twitter.distributedlog.DistributedLogManagerFactory(conf, uri));
+                namespaces.add(
+                        DistributedLogNamespaceBuilder.newBuilder()
+                                .conf(conf)
+                                .uri(uri)
+                                .build());
             }
             final CountDownLatch doneLatch = new CountDownLatch(uris.size());
             final AtomicInteger numFailures = new AtomicInteger(0);
             ExecutorService executor = Executors.newFixedThreadPool(uris.size());
             try {
                 int i = 0;
-                for (com.twitter.distributedlog.DistributedLogManagerFactory factory : factories) {
-                    final com.twitter.distributedlog.DistributedLogManagerFactory dlFactory = factory;
+                for (final DistributedLogNamespace namespace : namespaces) {
+                    final DistributedLogNamespace dlNamespace = namespace;
                     final URI uri = uris.get(i);
                     final List<String> aps = allocationPaths.get(i);
                     i++;
@@ -245,12 +250,12 @@ public class DLAuditor {
                         public void run() {
                             try {
                                 logger.info("Collecting ledgers from {} : {}", uri, aps);
-                                collectLedgersFromAllocator(uri, dlFactory, aps, ledgers);
+                                collectLedgersFromAllocator(uri, namespace, aps, ledgers);
                                 synchronized (ledgers) {
                                     logger.info("Collected {} ledgers from allocators for {} : {} ",
                                             new Object[]{ledgers.size(), uri, ledgers});
                                 }
-                                collectLedgersFromDL(uri, dlFactory, ledgers);
+                                collectLedgersFromDL(uri, namespace, ledgers);
                             } catch (IOException e) {
                                 numFailures.incrementAndGet();
                                 logger.info("Error to collect ledgers from DL : ", e);
@@ -273,15 +278,15 @@ public class DLAuditor {
                 executor.shutdown();
             }
         } finally {
-            for (com.twitter.distributedlog.DistributedLogManagerFactory factory : factories) {
-                factory.close();
+            for (DistributedLogNamespace namespace : namespaces) {
+                namespace.close();
             }
         }
         return ledgers;
     }
 
     private void collectLedgersFromAllocator(final URI uri,
-                                             final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+                                             final DistributedLogNamespace namespace,
                                              final List<String> allocationPaths,
                                              final Set<Long> ledgers) throws IOException {
         final LinkedBlockingQueue<String> poolQueue =
@@ -289,7 +294,7 @@ public class DLAuditor {
         for (String allocationPath : allocationPaths) {
             String rootPath = uri.getPath() + "/" + allocationPath;
             try {
-                List<String> pools = getZooKeeperClient(factory).get().getChildren(rootPath, false);
+                List<String> pools = getZooKeeperClient(namespace).get().getChildren(rootPath, false);
                 for (String pool : pools) {
                     poolQueue.add(rootPath + "/" + pool);
                 }
@@ -318,11 +323,11 @@ public class DLAuditor {
 
             private void collectLedgersFromPool(String poolPath)
                     throws InterruptedException, ZooKeeperClient.ZooKeeperConnectionException, KeeperException {
-                List<String> allocators = getZooKeeperClient(factory).get()
+                List<String> allocators = getZooKeeperClient(namespace).get()
                                         .getChildren(poolPath, false);
                 for (String allocator : allocators) {
                     String allocatorPath = poolPath + "/" + allocator;
-                    byte[] data = getZooKeeperClient(factory).get().getData(allocatorPath, false, new Stat());
+                    byte[] data = getZooKeeperClient(namespace).get().getData(allocatorPath, false, new Stat());
                     if (null != data && data.length > 0) {
                         try {
                             long ledgerId = DLUtils.bytes2LogSegmentId(data);
@@ -341,30 +346,31 @@ public class DLAuditor {
     }
 
     private void collectLedgersFromDL(final URI uri,
-                                      final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+                                      final DistributedLogNamespace namespace,
                                       final Set<Long> ledgers) throws IOException {
         logger.info("Enumerating {} to collect streams.", uri);
-        Collection<String> streams = factory.enumerateAllLogsInNamespace();
+        Iterator<String> streams = namespace.getLogs();
         final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>();
-        streamQueue.addAll(streams);
+        while (streams.hasNext()) {
+            streamQueue.add(streams.next());
+        }
 
         logger.info("Collected {} streams from uri {} : {}",
-                    new Object[] { streams.size(), uri, streams });
+                    new Object[] { streamQueue.size(), uri, streams });
 
         executeAction(streamQueue, 10, new Action<String>() {
             @Override
             public void execute(String stream) throws IOException {
-                collectLedgersFromStream(factory, stream, ledgers);
+                collectLedgersFromStream(namespace, stream, ledgers);
             }
         });
     }
 
-    private List<Long> collectLedgersFromStream(com.twitter.distributedlog.DistributedLogManagerFactory factory,
+    private List<Long> collectLedgersFromStream(DistributedLogNamespace namespace,
                                                 String stream,
                                                 Set<Long> ledgers)
             throws IOException {
-        DistributedLogManager dlm = factory.createDistributedLogManager(stream,
-                com.twitter.distributedlog.DistributedLogManagerFactory.ClientSharingOption.SharedClients);
+        DistributedLogManager dlm = namespace.openLog(stream);
         try {
             List<LogSegmentMetadata> segments = dlm.getLogSegments();
             List<Long> sLedgers = new ArrayList<Long>();
@@ -388,21 +394,25 @@ public class DLAuditor {
      */
     public Map<String, Long> calculateStreamSpaceUsage(final URI uri) throws IOException {
         logger.info("Collecting stream space usage for {}.", uri);
-        com.twitter.distributedlog.DistributedLogManagerFactory factory =
-                new com.twitter.distributedlog.DistributedLogManagerFactory(conf, uri);
+        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+                .conf(conf)
+                .uri(uri)
+                .build();
         try {
-            return calculateStreamSpaceUsage(uri, factory);
+            return calculateStreamSpaceUsage(uri, namespace);
         } finally {
-            factory.close();
+            namespace.close();
         }
     }
 
     private Map<String, Long> calculateStreamSpaceUsage(
-            final URI uri, final com.twitter.distributedlog.DistributedLogManagerFactory factory)
+            final URI uri, final DistributedLogNamespace namespace)
         throws IOException {
-        Collection<String> streams = factory.enumerateAllLogsInNamespace();
+        Iterator<String> streams = namespace.getLogs();
         final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>();
-        streamQueue.addAll(streams);
+        while (streams.hasNext()) {
+            streamQueue.add(streams.next());
+        }
 
         final Map<String, Long> streamSpaceUsageMap =
                 new ConcurrentSkipListMap<String, Long>();
@@ -412,7 +422,7 @@ public class DLAuditor {
             @Override
             public void execute(String stream) throws IOException {
                 streamSpaceUsageMap.put(stream,
-                        calculateStreamSpaceUsage(factory, stream));
+                        calculateStreamSpaceUsage(namespace, stream));
                 if (numStreamsCollected.incrementAndGet() % 1000 == 0) {
                     logger.info("Calculated {} streams from uri {}.", numStreamsCollected.get(), uri);
                 }
@@ -422,16 +432,15 @@ public class DLAuditor {
         return streamSpaceUsageMap;
     }
 
-    private long calculateStreamSpaceUsage(final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+    private long calculateStreamSpaceUsage(final DistributedLogNamespace namespace,
                                            final String stream) throws IOException {
-        DistributedLogManager dlm = factory.createDistributedLogManager(stream,
-                com.twitter.distributedlog.DistributedLogManagerFactory.ClientSharingOption.SharedClients);
+        DistributedLogManager dlm = namespace.openLog(stream);
         long totalBytes = 0;
         try {
             List<LogSegmentMetadata> segments = dlm.getLogSegments();
             for (LogSegmentMetadata segment : segments) {
                 try {
-                    LedgerHandle lh = getBookKeeperClient(factory).get().openLedgerNoRecovery(segment.getLogSegmentId(),
+                    LedgerHandle lh = getBookKeeperClient(namespace).get().openLedgerNoRecovery(segment.getLogSegmentId(),
                             BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
                     totalBytes += lh.getLength();
                     lh.close();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKDLUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKDLUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKDLUtils.java
deleted file mode 100644
index dd78a4e..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKDLUtils.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
-
-import java.net.URI;
-
-/**
- * Utils for bookkeeper based distributedlog implementation
- */
-public class BKDLUtils {
-
-    /**
-     * Is it a reserved stream name in bkdl namespace?
-     *
-     * @param name
-     *          stream name
-     * @return true if it is reserved name, otherwise false.
-     */
-    public static boolean isReservedStreamName(String name) {
-        return name.startsWith(".");
-    }
-
-    /**
-     * Validate the configuration and uri.
-     *
-     * @param conf
-     *          distributedlog configuration
-     * @param uri
-     *          distributedlog uri
-     * @throws IllegalArgumentException
-     */
-    public static void validateConfAndURI(DistributedLogConfiguration conf, URI uri)
-        throws IllegalArgumentException {
-        if (null == conf) {
-            throw new IllegalArgumentException("Incorrect Configuration");
-        } else {
-            conf.validate();
-        }
-        if ((null == uri) || (null == uri.getAuthority()) || (null == uri.getPath())) {
-            throw new IllegalArgumentException("Incorrect ZK URI");
-        }
-    }
-
-    /**
-     * Validate the stream name.
-     *
-     * @param nameOfStream
-     *          name of stream
-     * @throws InvalidStreamNameException
-     */
-    public static void validateName(String nameOfStream)
-            throws InvalidStreamNameException {
-        String reason = null;
-        char chars[] = nameOfStream.toCharArray();
-        char c;
-        // validate the stream to see if meet zookeeper path's requirement
-        for (int i = 0; i < chars.length; i++) {
-            c = chars[i];
-
-            if (c == 0) {
-                reason = "null character not allowed @" + i;
-                break;
-            } else if (c == '/') {
-                reason = "'/' not allowed @" + i;
-                break;
-            } else if (c > '\u0000' && c < '\u001f'
-                    || c > '\u007f' && c < '\u009F'
-                    || c > '\ud800' && c < '\uf8ff'
-                    || c > '\ufff0' && c < '\uffff') {
-                reason = "invalid charater @" + i;
-                break;
-            }
-        }
-        if (null != reason) {
-            throw new InvalidStreamNameException(nameOfStream, reason);
-        }
-        if (isReservedStreamName(nameOfStream)) {
-            throw new InvalidStreamNameException(nameOfStream,
-                    "Stream Name is reserved");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java
new file mode 100644
index 0000000..5921233
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java
@@ -0,0 +1,631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.distributedlog.BookKeeperClient;
+import com.twitter.distributedlog.BookKeeperClientBuilder;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.MetadataAccessor;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.ZooKeeperClientBuilder;
+import com.twitter.distributedlog.acl.AccessControlManager;
+import com.twitter.distributedlog.acl.DefaultAccessControlManager;
+import com.twitter.distributedlog.impl.acl.ZKAccessControlManager;
+import com.twitter.distributedlog.bk.LedgerAllocator;
+import com.twitter.distributedlog.bk.LedgerAllocatorUtils;
+import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+import com.twitter.distributedlog.exceptions.AlreadyClosedException;
+import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
+import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
+import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
+import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
+import com.twitter.distributedlog.impl.subscription.ZKSubscriptionsStore;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
+import com.twitter.distributedlog.metadata.LogMetadataForReader;
+import com.twitter.distributedlog.metadata.LogMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
+import com.twitter.distributedlog.namespace.NamespaceDriver;
+import com.twitter.distributedlog.namespace.NamespaceDriverManager;
+import com.twitter.distributedlog.subscription.SubscriptionsStore;
+import com.twitter.distributedlog.util.OrderedScheduler;
+import com.twitter.distributedlog.util.Utils;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.RetryPolicy;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.Stat;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.twitter.distributedlog.util.DLUtils.isReservedStreamName;
+import static com.twitter.distributedlog.util.DLUtils.validateName;
+
+/**
+ * Manager for ZooKeeper/BookKeeper based namespace
+ */
+public class BKNamespaceDriver implements NamespaceDriver {
+
+    private static Logger LOG = LoggerFactory.getLogger(BKNamespaceDriver.class);
+
+    // register itself
+    static {
+        NamespaceDriverManager.registerDriver(DistributedLogConstants.BACKEND_BK, BKNamespaceDriver.class);
+    }
+
+    /**
+     * Extract zk servers fro dl <i>namespace</i>.
+     *
+     * @param uri dl namespace
+     * @return zk servers
+     */
+    public static String getZKServersFromDLUri(URI uri) {
+        return uri.getAuthority().replace(";", ",");
+    }
+
+    // resources (passed from initialization)
+    private DistributedLogConfiguration conf;
+    private DynamicDistributedLogConfiguration dynConf;
+    private URI namespace;
+    private OrderedScheduler scheduler;
+    private FeatureProvider featureProvider;
+    private AsyncFailureInjector failureInjector;
+    private StatsLogger statsLogger;
+    private StatsLogger perLogStatsLogger;
+    private String clientId;
+    private int regionId;
+
+    //
+    // resources (created internally and initialized at #initialize())
+    //
+
+    // namespace binding
+    private BKDLConfig bkdlConfig;
+
+    // zookeeper clients
+    // NOTE: The actual zookeeper client is initialized lazily when it is referenced by
+    //       {@link com.twitter.distributedlog.ZooKeeperClient#get()}. So it is safe to
+    //       keep builders and their client wrappers here, as they will be used when
+    //       instantiating readers or writers.
+    private ZooKeeperClientBuilder sharedWriterZKCBuilder;
+    private ZooKeeperClient writerZKC;
+    private ZooKeeperClientBuilder sharedReaderZKCBuilder;
+    private ZooKeeperClient readerZKC;
+    // NOTE: The actual bookkeeper client is initialized lazily when it is referenced by
+    //       {@link com.twitter.distributedlog.BookKeeperClient#get()}. So it is safe to
+    //       keep builders and their client wrappers here, as they will be used when
+    //       instantiating readers or writers.
+    private ClientSocketChannelFactory channelFactory;
+    private HashedWheelTimer requestTimer;
+    private BookKeeperClientBuilder sharedWriterBKCBuilder;
+    private BookKeeperClient writerBKC;
+    private BookKeeperClientBuilder sharedReaderBKCBuilder;
+    private BookKeeperClient readerBKC;
+
+    // log stream metadata store
+    private LogMetadataStore metadataStore;
+    private LogStreamMetadataStore writerStreamMetadataStore;
+    private LogStreamMetadataStore readerStreamMetadataStore;
+
+    //
+    // resources (lazily initialized)
+    //
+
+    // ledger allocator
+    private LedgerAllocator allocator;
+
+    // log segment entry stores
+    private LogSegmentEntryStore writerEntryStore;
+    private LogSegmentEntryStore readerEntryStore;
+
+    // access control manager
+    private AccessControlManager accessControlManager;
+
+    //
+    // states
+    //
+    protected boolean initialized = false;
+    protected AtomicBoolean closed = new AtomicBoolean(false);
+
+    /**
+     * Public constructor for reflection.
+     */
+    public BKNamespaceDriver() {
+    }
+
+    @Override
+    public synchronized NamespaceDriver initialize(DistributedLogConfiguration conf,
+                                                   DynamicDistributedLogConfiguration dynConf,
+                                                   URI namespace,
+                                                   OrderedScheduler scheduler,
+                                                   FeatureProvider featureProvider,
+                                                   AsyncFailureInjector failureInjector,
+                                                   StatsLogger statsLogger,
+                                                   StatsLogger perLogStatsLogger,
+                                                   String clientId,
+                                                   int regionId) throws IOException {
+        if (initialized) {
+            return this;
+        }
+        // validate the namespace
+        if ((null == namespace) || (null == namespace.getAuthority()) || (null == namespace.getPath())) {
+            throw new IOException("Incorrect distributedlog namespace : " + namespace);
+        }
+
+        // initialize the resources
+        this.conf = conf;
+        this.dynConf = dynConf;
+        this.namespace = namespace;
+        this.scheduler = scheduler;
+        this.featureProvider = featureProvider;
+        this.failureInjector = failureInjector;
+        this.statsLogger = statsLogger;
+        this.perLogStatsLogger = perLogStatsLogger;
+        this.clientId = clientId;
+        this.regionId = regionId;
+
+        // initialize the zookeeper clients
+        initializeZooKeeperClients();
+
+        // initialize the bookkeeper clients
+        initializeBookKeeperClients();
+
+        // propagate bkdlConfig to configuration
+        BKDLConfig.propagateConfiguration(bkdlConfig, conf);
+
+        // initialize the log metadata & stream metadata store
+        initializeLogStreamMetadataStores();
+
+        // initialize other resources
+        initializeOtherResources();
+
+        initialized = true;
+
+        LOG.info("Initialized BK namespace driver: clientId = {}, regionId = {}, federated = {}.",
+                new Object[]{clientId, regionId, bkdlConfig.isFederatedNamespace()});
+        return this;
+    }
+
+    private void initializeZooKeeperClients() throws IOException {
+        // Build the namespace zookeeper client
+        this.sharedWriterZKCBuilder = createZKClientBuilder(
+                String.format("dlzk:%s:factory_writer_shared", namespace),
+                conf,
+                getZKServersFromDLUri(namespace),
+                statsLogger.scope("dlzk_factory_writer_shared"));
+        this.writerZKC = sharedWriterZKCBuilder.build();
+
+        // Resolve namespace binding
+        this.bkdlConfig = BKDLConfig.resolveDLConfig(writerZKC, namespace);
+
+        // Build zookeeper client for readers
+        if (bkdlConfig.getDlZkServersForWriter().equals(bkdlConfig.getDlZkServersForReader())) {
+            this.sharedReaderZKCBuilder = this.sharedWriterZKCBuilder;
+        } else {
+            this.sharedReaderZKCBuilder = createZKClientBuilder(
+                    String.format("dlzk:%s:factory_reader_shared", namespace),
+                    conf,
+                    bkdlConfig.getDlZkServersForReader(),
+                    statsLogger.scope("dlzk_factory_reader_shared"));
+        }
+        this.readerZKC = this.sharedReaderZKCBuilder.build();
+    }
+
+    private synchronized BKDLConfig getBkdlConfig() {
+        return bkdlConfig;
+    }
+
+    private void initializeBookKeeperClients() throws IOException {
+        this.channelFactory = new NioClientSocketChannelFactory(
+                Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()),
+                Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()),
+                conf.getBKClientNumberIOThreads());
+        this.requestTimer = new HashedWheelTimer(
+                new ThreadFactoryBuilder().setNameFormat("DLFactoryTimer-%d").build(),
+                conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
+                conf.getTimeoutTimerNumTicks());
+        // Build bookkeeper client for writers
+        this.sharedWriterBKCBuilder = createBKCBuilder(
+                String.format("bk:%s:factory_writer_shared", namespace),
+                conf,
+                bkdlConfig.getBkZkServersForWriter(),
+                bkdlConfig.getBkLedgersPath(),
+                channelFactory,
+                requestTimer,
+                Optional.of(featureProvider.scope("bkc")),
+                statsLogger);
+        this.writerBKC = this.sharedWriterBKCBuilder.build();
+
+        // Build bookkeeper client for readers
+        if (bkdlConfig.getBkZkServersForWriter().equals(bkdlConfig.getBkZkServersForReader())) {
+            this.sharedReaderBKCBuilder = this.sharedWriterBKCBuilder;
+        } else {
+            this.sharedReaderBKCBuilder = createBKCBuilder(
+                    String.format("bk:%s:factory_reader_shared", namespace),
+                    conf,
+                    bkdlConfig.getBkZkServersForReader(),
+                    bkdlConfig.getBkLedgersPath(),
+                    channelFactory,
+                    requestTimer,
+                    Optional.<FeatureProvider>absent(),
+                    statsLogger);
+        }
+        this.readerBKC = this.sharedReaderBKCBuilder.build();
+    }
+
+    private void initializeLogStreamMetadataStores() throws IOException {
+        // log metadata store
+        if (bkdlConfig.isFederatedNamespace() || conf.isFederatedNamespaceEnabled()) {
+            this.metadataStore = new FederatedZKLogMetadataStore(conf, namespace, readerZKC, scheduler);
+        } else {
+            this.metadataStore = new ZKLogMetadataStore(conf, namespace, readerZKC, scheduler);
+        }
+
+        // create log stream metadata store
+        this.writerStreamMetadataStore =
+                new ZKLogStreamMetadataStore(
+                        clientId,
+                        conf,
+                        writerZKC,
+                        scheduler,
+                        statsLogger);
+        this.readerStreamMetadataStore =
+                new ZKLogStreamMetadataStore(
+                        clientId,
+                        conf,
+                        readerZKC,
+                        scheduler,
+                        statsLogger);
+    }
+
+    @VisibleForTesting
+    public static String validateAndGetFullLedgerAllocatorPoolPath(DistributedLogConfiguration conf, URI uri) throws IOException {
+        String poolPath = conf.getLedgerAllocatorPoolPath();
+        LOG.info("PoolPath is {}", poolPath);
+        if (null == poolPath || !poolPath.startsWith(".") || poolPath.endsWith("/")) {
+            LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", poolPath);
+            throw new IOException("Invalid ledger allocator pool path specified : " + poolPath);
+        }
+        String poolName = conf.getLedgerAllocatorPoolName();
+        if (null == poolName) {
+            LOG.error("No ledger allocator pool name specified when enabling ledger allocator pool.");
+            throw new IOException("No ledger allocator name specified when enabling ledger allocator pool.");
+        }
+        String rootPath = uri.getPath() + "/" + poolPath + "/" + poolName;
+        try {
+            PathUtils.validatePath(rootPath);
+        } catch (IllegalArgumentException iae) {
+            LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", poolPath);
+            throw new IOException("Invalid ledger allocator pool path specified : " + poolPath);
+        }
+        return rootPath;
+    }
+
+    private void initializeOtherResources() throws IOException {
+        // Ledger allocator
+        if (conf.getEnableLedgerAllocatorPool()) {
+            String allocatorPoolPath = validateAndGetFullLedgerAllocatorPoolPath(conf, namespace);
+            allocator = LedgerAllocatorUtils.createLedgerAllocatorPool(
+                    allocatorPoolPath,
+                    conf.getLedgerAllocatorPoolCoreSize(),
+                    conf,
+                    writerZKC,
+                    writerBKC,
+                    scheduler);
+            if (null != allocator) {
+                allocator.start();
+            }
+            LOG.info("Created ledger allocator pool under {} with size {}.", allocatorPoolPath, conf.getLedgerAllocatorPoolCoreSize());
+        } else {
+            allocator = null;
+        }
+
+    }
+
+    private void checkState() throws IOException {
+        if (closed.get()) {
+            LOG.error("BK namespace driver {} is already closed", namespace);
+            throw new AlreadyClosedException("BK namespace driver " + namespace + " is already closed");
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (!closed.compareAndSet(false, true)) {
+            return;
+        }
+        doClose();
+    }
+
+    private void doClose() {
+        if (null != accessControlManager) {
+            accessControlManager.close();
+            LOG.info("Access Control Manager Stopped.");
+        }
+
+        // Close the allocator
+        if (null != allocator) {
+            Utils.closeQuietly(allocator);
+            LOG.info("Ledger Allocator stopped.");
+        }
+
+        // Shutdown log segment metadata stores
+        Utils.close(writerStreamMetadataStore);
+        Utils.close(readerStreamMetadataStore);
+
+        writerBKC.close();
+        readerBKC.close();
+        writerZKC.close();
+        readerZKC.close();
+        // release bookkeeper resources
+        channelFactory.releaseExternalResources();
+        LOG.info("Release external resources used by channel factory.");
+        requestTimer.stop();
+        LOG.info("Stopped request timer");
+    }
+
+    @Override
+    public URI getUri() {
+        return namespace;
+    }
+
+    @Override
+    public String getScheme() {
+        return DistributedLogConstants.BACKEND_BK;
+    }
+
+    @Override
+    public LogMetadataStore getLogMetadataStore() {
+        return metadataStore;
+    }
+
+    @Override
+    public LogStreamMetadataStore getLogStreamMetadataStore(Role role) {
+        if (Role.WRITER == role) {
+            return writerStreamMetadataStore;
+        } else {
+            return readerStreamMetadataStore;
+        }
+    }
+
+    @Override
+    public LogSegmentEntryStore getLogSegmentEntryStore(Role role) {
+        if (Role.WRITER == role) {
+            return getWriterEntryStore();
+        } else {
+            return getReaderEntryStore();
+        }
+    }
+
+    private LogSegmentEntryStore getWriterEntryStore() {
+        if (null == writerEntryStore) {
+            writerEntryStore = new BKLogSegmentEntryStore(
+                    conf,
+                    dynConf,
+                    writerZKC,
+                    writerBKC,
+                    scheduler,
+                    allocator,
+                    statsLogger,
+                    failureInjector);
+        }
+        return writerEntryStore;
+    }
+
+    private LogSegmentEntryStore getReaderEntryStore() {
+        if (null == readerEntryStore) {
+            readerEntryStore = new BKLogSegmentEntryStore(
+                    conf,
+                    dynConf,
+                    writerZKC,
+                    readerBKC,
+                    scheduler,
+                    allocator,
+                    statsLogger,
+                    failureInjector);
+        }
+        return readerEntryStore;
+    }
+
+    @Override
+    public AccessControlManager getAccessControlManager() throws IOException {
+        if (null == accessControlManager) {
+            String aclRootPath = getBkdlConfig().getACLRootPath();
+            // Build the access control manager
+            if (aclRootPath == null) {
+                accessControlManager = DefaultAccessControlManager.INSTANCE;
+                LOG.info("Created default access control manager for {}", namespace);
+            } else {
+                if (!isReservedStreamName(aclRootPath)) {
+                    throw new IOException("Invalid Access Control List Root Path : " + aclRootPath);
+                }
+                String zkRootPath = namespace.getPath() + "/" + aclRootPath;
+                LOG.info("Creating zk based access control manager @ {} for {}",
+                        zkRootPath, namespace);
+                accessControlManager = new ZKAccessControlManager(conf, readerZKC,
+                        zkRootPath, scheduler);
+                LOG.info("Created zk based access control manager @ {} for {}",
+                        zkRootPath, namespace);
+            }
+        }
+        return accessControlManager;
+    }
+
+    @Override
+    public SubscriptionsStore getSubscriptionsStore(String streamName) {
+        return new ZKSubscriptionsStore(
+                writerZKC,
+                LogMetadataForReader.getSubscribersPath(namespace, streamName, conf.getUnpartitionedStreamName()));
+    }
+
+    //
+    // Legacy Intefaces
+    //
+
+    @Override
+    public MetadataAccessor getMetadataAccessor(String streamName)
+            throws InvalidStreamNameException, IOException {
+        if (getBkdlConfig().isFederatedNamespace()) {
+            throw new UnsupportedOperationException();
+        }
+        checkState();
+        validateName(streamName);
+        return new ZKMetadataAccessor(
+                streamName,
+                conf,
+                namespace,
+                sharedWriterZKCBuilder,
+                sharedReaderZKCBuilder,
+                statsLogger);
+    }
+
+    public Map<String, byte[]> enumerateLogsWithMetadataInNamespace()
+        throws IOException, IllegalArgumentException {
+        String namespaceRootPath = namespace.getPath();
+        HashMap<String, byte[]> result = new HashMap<String, byte[]>();
+        ZooKeeperClient zkc = writerZKC;
+        try {
+            ZooKeeper zk = Utils.sync(zkc, namespaceRootPath);
+            Stat currentStat = zk.exists(namespaceRootPath, false);
+            if (currentStat == null) {
+                return result;
+            }
+            List<String> children = zk.getChildren(namespaceRootPath, false);
+            for(String child: children) {
+                if (isReservedStreamName(child)) {
+                    continue;
+                }
+                String zkPath = String.format("%s/%s", namespaceRootPath, child);
+                currentStat = zk.exists(zkPath, false);
+                if (currentStat == null) {
+                    result.put(child, new byte[0]);
+                } else {
+                    result.put(child, zk.getData(zkPath, false, currentStat));
+                }
+            }
+        } catch (InterruptedException ie) {
+            LOG.error("Interrupted while deleting " + namespaceRootPath, ie);
+            throw new IOException("Interrupted while reading " + namespaceRootPath, ie);
+        } catch (KeeperException ke) {
+            LOG.error("Error reading" + namespaceRootPath + "entry in zookeeper", ke);
+            throw new IOException("Error reading" + namespaceRootPath + "entry in zookeeper", ke);
+        }
+        return result;
+    }
+
+    //
+    // Zk & Bk Utils
+    //
+
+    public static ZooKeeperClientBuilder createZKClientBuilder(String zkcName,
+                                                               DistributedLogConfiguration conf,
+                                                               String zkServers,
+                                                               StatsLogger statsLogger) {
+        RetryPolicy retryPolicy = null;
+        if (conf.getZKNumRetries() > 0) {
+            retryPolicy = new BoundExponentialBackoffRetryPolicy(
+                conf.getZKRetryBackoffStartMillis(),
+                conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries());
+        }
+        ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder()
+            .name(zkcName)
+            .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
+            .retryThreadCount(conf.getZKClientNumberRetryThreads())
+            .requestRateLimit(conf.getZKRequestRateLimit())
+            .zkServers(zkServers)
+            .retryPolicy(retryPolicy)
+            .statsLogger(statsLogger)
+            .zkAclId(conf.getZkAclId());
+        LOG.info("Created shared zooKeeper client builder {}: zkServers = {}, numRetries = {}, sessionTimeout = {}, retryBackoff = {},"
+                + " maxRetryBackoff = {}, zkAclId = {}.", new Object[] { zkcName, zkServers, conf.getZKNumRetries(),
+                conf.getZKSessionTimeoutMilliseconds(), conf.getZKRetryBackoffStartMillis(),
+                conf.getZKRetryBackoffMaxMillis(), conf.getZkAclId() });
+        return builder;
+    }
+
+    private BookKeeperClientBuilder createBKCBuilder(String bkcName,
+                                                     DistributedLogConfiguration conf,
+                                                     String zkServers,
+                                                     String ledgersPath,
+                                                     ClientSocketChannelFactory channelFactory,
+                                                     HashedWheelTimer requestTimer,
+                                                     Optional<FeatureProvider> featureProviderOptional,
+                                                     StatsLogger statsLogger) {
+        BookKeeperClientBuilder builder = BookKeeperClientBuilder.newBuilder()
+                .name(bkcName)
+                .dlConfig(conf)
+                .zkServers(zkServers)
+                .ledgersPath(ledgersPath)
+                .channelFactory(channelFactory)
+                .requestTimer(requestTimer)
+                .featureProvider(featureProviderOptional)
+                .statsLogger(statsLogger);
+        LOG.info("Created shared client builder {} : zkServers = {}, ledgersPath = {}, numIOThreads = {}",
+                new Object[] { bkcName, zkServers, ledgersPath, conf.getBKClientNumberIOThreads() });
+        return builder;
+    }
+
+    //
+    // Test Methods
+    //
+
+    @VisibleForTesting
+    public ZooKeeperClient getWriterZKC() {
+        return writerZKC;
+    }
+
+    @VisibleForTesting
+    public BookKeeperClient getReaderBKC() {
+        return readerBKC;
+    }
+
+    @VisibleForTesting
+    public AsyncFailureInjector getFailureInjector() {
+        return this.failureInjector;
+    }
+
+    @VisibleForTesting
+    public LogStreamMetadataStore getWriterStreamMetadataStore() {
+        return writerStreamMetadataStore;
+    }
+
+    @VisibleForTesting
+    public LedgerAllocator getLedgerAllocator() {
+        return allocator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java
index b84ab2e..50b1405 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java
@@ -36,7 +36,7 @@ import java.net.URI;
 import java.util.Iterator;
 import java.util.List;
 
-import static com.twitter.distributedlog.impl.BKDLUtils.*;
+import static com.twitter.distributedlog.util.DLUtils.*;
 
 /**
  * ZooKeeper based log metadata store