You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:29 UTC

[24/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java
deleted file mode 100644
index 5921233..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java
+++ /dev/null
@@ -1,631 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.distributedlog.BookKeeperClient;
-import com.twitter.distributedlog.BookKeeperClientBuilder;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.MetadataAccessor;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.ZooKeeperClientBuilder;
-import com.twitter.distributedlog.acl.AccessControlManager;
-import com.twitter.distributedlog.acl.DefaultAccessControlManager;
-import com.twitter.distributedlog.impl.acl.ZKAccessControlManager;
-import com.twitter.distributedlog.bk.LedgerAllocator;
-import com.twitter.distributedlog.bk.LedgerAllocatorUtils;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.AlreadyClosedException;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
-import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
-import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
-import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
-import com.twitter.distributedlog.impl.subscription.ZKSubscriptionsStore;
-import com.twitter.distributedlog.injector.AsyncFailureInjector;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
-import com.twitter.distributedlog.impl.metadata.BKDLConfig;
-import com.twitter.distributedlog.metadata.LogMetadataForReader;
-import com.twitter.distributedlog.metadata.LogMetadataStore;
-import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
-import com.twitter.distributedlog.namespace.NamespaceDriver;
-import com.twitter.distributedlog.namespace.NamespaceDriverManager;
-import com.twitter.distributedlog.subscription.SubscriptionsStore;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.Utils;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.RetryPolicy;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.data.Stat;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static com.twitter.distributedlog.util.DLUtils.isReservedStreamName;
-import static com.twitter.distributedlog.util.DLUtils.validateName;
-
-/**
- * Manager for ZooKeeper/BookKeeper based namespace
- */
-public class BKNamespaceDriver implements NamespaceDriver {
-
-    private static Logger LOG = LoggerFactory.getLogger(BKNamespaceDriver.class);
-
-    // register itself
-    static {
-        NamespaceDriverManager.registerDriver(DistributedLogConstants.BACKEND_BK, BKNamespaceDriver.class);
-    }
-
-    /**
-     * Extract zk servers fro dl <i>namespace</i>.
-     *
-     * @param uri dl namespace
-     * @return zk servers
-     */
-    public static String getZKServersFromDLUri(URI uri) {
-        return uri.getAuthority().replace(";", ",");
-    }
-
-    // resources (passed from initialization)
-    private DistributedLogConfiguration conf;
-    private DynamicDistributedLogConfiguration dynConf;
-    private URI namespace;
-    private OrderedScheduler scheduler;
-    private FeatureProvider featureProvider;
-    private AsyncFailureInjector failureInjector;
-    private StatsLogger statsLogger;
-    private StatsLogger perLogStatsLogger;
-    private String clientId;
-    private int regionId;
-
-    //
-    // resources (created internally and initialized at #initialize())
-    //
-
-    // namespace binding
-    private BKDLConfig bkdlConfig;
-
-    // zookeeper clients
-    // NOTE: The actual zookeeper client is initialized lazily when it is referenced by
-    //       {@link com.twitter.distributedlog.ZooKeeperClient#get()}. So it is safe to
-    //       keep builders and their client wrappers here, as they will be used when
-    //       instantiating readers or writers.
-    private ZooKeeperClientBuilder sharedWriterZKCBuilder;
-    private ZooKeeperClient writerZKC;
-    private ZooKeeperClientBuilder sharedReaderZKCBuilder;
-    private ZooKeeperClient readerZKC;
-    // NOTE: The actual bookkeeper client is initialized lazily when it is referenced by
-    //       {@link com.twitter.distributedlog.BookKeeperClient#get()}. So it is safe to
-    //       keep builders and their client wrappers here, as they will be used when
-    //       instantiating readers or writers.
-    private ClientSocketChannelFactory channelFactory;
-    private HashedWheelTimer requestTimer;
-    private BookKeeperClientBuilder sharedWriterBKCBuilder;
-    private BookKeeperClient writerBKC;
-    private BookKeeperClientBuilder sharedReaderBKCBuilder;
-    private BookKeeperClient readerBKC;
-
-    // log stream metadata store
-    private LogMetadataStore metadataStore;
-    private LogStreamMetadataStore writerStreamMetadataStore;
-    private LogStreamMetadataStore readerStreamMetadataStore;
-
-    //
-    // resources (lazily initialized)
-    //
-
-    // ledger allocator
-    private LedgerAllocator allocator;
-
-    // log segment entry stores
-    private LogSegmentEntryStore writerEntryStore;
-    private LogSegmentEntryStore readerEntryStore;
-
-    // access control manager
-    private AccessControlManager accessControlManager;
-
-    //
-    // states
-    //
-    protected boolean initialized = false;
-    protected AtomicBoolean closed = new AtomicBoolean(false);
-
-    /**
-     * Public constructor for reflection.
-     */
-    public BKNamespaceDriver() {
-    }
-
-    @Override
-    public synchronized NamespaceDriver initialize(DistributedLogConfiguration conf,
-                                                   DynamicDistributedLogConfiguration dynConf,
-                                                   URI namespace,
-                                                   OrderedScheduler scheduler,
-                                                   FeatureProvider featureProvider,
-                                                   AsyncFailureInjector failureInjector,
-                                                   StatsLogger statsLogger,
-                                                   StatsLogger perLogStatsLogger,
-                                                   String clientId,
-                                                   int regionId) throws IOException {
-        if (initialized) {
-            return this;
-        }
-        // validate the namespace
-        if ((null == namespace) || (null == namespace.getAuthority()) || (null == namespace.getPath())) {
-            throw new IOException("Incorrect distributedlog namespace : " + namespace);
-        }
-
-        // initialize the resources
-        this.conf = conf;
-        this.dynConf = dynConf;
-        this.namespace = namespace;
-        this.scheduler = scheduler;
-        this.featureProvider = featureProvider;
-        this.failureInjector = failureInjector;
-        this.statsLogger = statsLogger;
-        this.perLogStatsLogger = perLogStatsLogger;
-        this.clientId = clientId;
-        this.regionId = regionId;
-
-        // initialize the zookeeper clients
-        initializeZooKeeperClients();
-
-        // initialize the bookkeeper clients
-        initializeBookKeeperClients();
-
-        // propagate bkdlConfig to configuration
-        BKDLConfig.propagateConfiguration(bkdlConfig, conf);
-
-        // initialize the log metadata & stream metadata store
-        initializeLogStreamMetadataStores();
-
-        // initialize other resources
-        initializeOtherResources();
-
-        initialized = true;
-
-        LOG.info("Initialized BK namespace driver: clientId = {}, regionId = {}, federated = {}.",
-                new Object[]{clientId, regionId, bkdlConfig.isFederatedNamespace()});
-        return this;
-    }
-
-    private void initializeZooKeeperClients() throws IOException {
-        // Build the namespace zookeeper client
-        this.sharedWriterZKCBuilder = createZKClientBuilder(
-                String.format("dlzk:%s:factory_writer_shared", namespace),
-                conf,
-                getZKServersFromDLUri(namespace),
-                statsLogger.scope("dlzk_factory_writer_shared"));
-        this.writerZKC = sharedWriterZKCBuilder.build();
-
-        // Resolve namespace binding
-        this.bkdlConfig = BKDLConfig.resolveDLConfig(writerZKC, namespace);
-
-        // Build zookeeper client for readers
-        if (bkdlConfig.getDlZkServersForWriter().equals(bkdlConfig.getDlZkServersForReader())) {
-            this.sharedReaderZKCBuilder = this.sharedWriterZKCBuilder;
-        } else {
-            this.sharedReaderZKCBuilder = createZKClientBuilder(
-                    String.format("dlzk:%s:factory_reader_shared", namespace),
-                    conf,
-                    bkdlConfig.getDlZkServersForReader(),
-                    statsLogger.scope("dlzk_factory_reader_shared"));
-        }
-        this.readerZKC = this.sharedReaderZKCBuilder.build();
-    }
-
-    private synchronized BKDLConfig getBkdlConfig() {
-        return bkdlConfig;
-    }
-
-    private void initializeBookKeeperClients() throws IOException {
-        this.channelFactory = new NioClientSocketChannelFactory(
-                Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()),
-                Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()),
-                conf.getBKClientNumberIOThreads());
-        this.requestTimer = new HashedWheelTimer(
-                new ThreadFactoryBuilder().setNameFormat("DLFactoryTimer-%d").build(),
-                conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
-                conf.getTimeoutTimerNumTicks());
-        // Build bookkeeper client for writers
-        this.sharedWriterBKCBuilder = createBKCBuilder(
-                String.format("bk:%s:factory_writer_shared", namespace),
-                conf,
-                bkdlConfig.getBkZkServersForWriter(),
-                bkdlConfig.getBkLedgersPath(),
-                channelFactory,
-                requestTimer,
-                Optional.of(featureProvider.scope("bkc")),
-                statsLogger);
-        this.writerBKC = this.sharedWriterBKCBuilder.build();
-
-        // Build bookkeeper client for readers
-        if (bkdlConfig.getBkZkServersForWriter().equals(bkdlConfig.getBkZkServersForReader())) {
-            this.sharedReaderBKCBuilder = this.sharedWriterBKCBuilder;
-        } else {
-            this.sharedReaderBKCBuilder = createBKCBuilder(
-                    String.format("bk:%s:factory_reader_shared", namespace),
-                    conf,
-                    bkdlConfig.getBkZkServersForReader(),
-                    bkdlConfig.getBkLedgersPath(),
-                    channelFactory,
-                    requestTimer,
-                    Optional.<FeatureProvider>absent(),
-                    statsLogger);
-        }
-        this.readerBKC = this.sharedReaderBKCBuilder.build();
-    }
-
-    private void initializeLogStreamMetadataStores() throws IOException {
-        // log metadata store
-        if (bkdlConfig.isFederatedNamespace() || conf.isFederatedNamespaceEnabled()) {
-            this.metadataStore = new FederatedZKLogMetadataStore(conf, namespace, readerZKC, scheduler);
-        } else {
-            this.metadataStore = new ZKLogMetadataStore(conf, namespace, readerZKC, scheduler);
-        }
-
-        // create log stream metadata store
-        this.writerStreamMetadataStore =
-                new ZKLogStreamMetadataStore(
-                        clientId,
-                        conf,
-                        writerZKC,
-                        scheduler,
-                        statsLogger);
-        this.readerStreamMetadataStore =
-                new ZKLogStreamMetadataStore(
-                        clientId,
-                        conf,
-                        readerZKC,
-                        scheduler,
-                        statsLogger);
-    }
-
-    @VisibleForTesting
-    public static String validateAndGetFullLedgerAllocatorPoolPath(DistributedLogConfiguration conf, URI uri) throws IOException {
-        String poolPath = conf.getLedgerAllocatorPoolPath();
-        LOG.info("PoolPath is {}", poolPath);
-        if (null == poolPath || !poolPath.startsWith(".") || poolPath.endsWith("/")) {
-            LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", poolPath);
-            throw new IOException("Invalid ledger allocator pool path specified : " + poolPath);
-        }
-        String poolName = conf.getLedgerAllocatorPoolName();
-        if (null == poolName) {
-            LOG.error("No ledger allocator pool name specified when enabling ledger allocator pool.");
-            throw new IOException("No ledger allocator name specified when enabling ledger allocator pool.");
-        }
-        String rootPath = uri.getPath() + "/" + poolPath + "/" + poolName;
-        try {
-            PathUtils.validatePath(rootPath);
-        } catch (IllegalArgumentException iae) {
-            LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", poolPath);
-            throw new IOException("Invalid ledger allocator pool path specified : " + poolPath);
-        }
-        return rootPath;
-    }
-
-    private void initializeOtherResources() throws IOException {
-        // Ledger allocator
-        if (conf.getEnableLedgerAllocatorPool()) {
-            String allocatorPoolPath = validateAndGetFullLedgerAllocatorPoolPath(conf, namespace);
-            allocator = LedgerAllocatorUtils.createLedgerAllocatorPool(
-                    allocatorPoolPath,
-                    conf.getLedgerAllocatorPoolCoreSize(),
-                    conf,
-                    writerZKC,
-                    writerBKC,
-                    scheduler);
-            if (null != allocator) {
-                allocator.start();
-            }
-            LOG.info("Created ledger allocator pool under {} with size {}.", allocatorPoolPath, conf.getLedgerAllocatorPoolCoreSize());
-        } else {
-            allocator = null;
-        }
-
-    }
-
-    private void checkState() throws IOException {
-        if (closed.get()) {
-            LOG.error("BK namespace driver {} is already closed", namespace);
-            throw new AlreadyClosedException("BK namespace driver " + namespace + " is already closed");
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (!closed.compareAndSet(false, true)) {
-            return;
-        }
-        doClose();
-    }
-
-    private void doClose() {
-        if (null != accessControlManager) {
-            accessControlManager.close();
-            LOG.info("Access Control Manager Stopped.");
-        }
-
-        // Close the allocator
-        if (null != allocator) {
-            Utils.closeQuietly(allocator);
-            LOG.info("Ledger Allocator stopped.");
-        }
-
-        // Shutdown log segment metadata stores
-        Utils.close(writerStreamMetadataStore);
-        Utils.close(readerStreamMetadataStore);
-
-        writerBKC.close();
-        readerBKC.close();
-        writerZKC.close();
-        readerZKC.close();
-        // release bookkeeper resources
-        channelFactory.releaseExternalResources();
-        LOG.info("Release external resources used by channel factory.");
-        requestTimer.stop();
-        LOG.info("Stopped request timer");
-    }
-
-    @Override
-    public URI getUri() {
-        return namespace;
-    }
-
-    @Override
-    public String getScheme() {
-        return DistributedLogConstants.BACKEND_BK;
-    }
-
-    @Override
-    public LogMetadataStore getLogMetadataStore() {
-        return metadataStore;
-    }
-
-    @Override
-    public LogStreamMetadataStore getLogStreamMetadataStore(Role role) {
-        if (Role.WRITER == role) {
-            return writerStreamMetadataStore;
-        } else {
-            return readerStreamMetadataStore;
-        }
-    }
-
-    @Override
-    public LogSegmentEntryStore getLogSegmentEntryStore(Role role) {
-        if (Role.WRITER == role) {
-            return getWriterEntryStore();
-        } else {
-            return getReaderEntryStore();
-        }
-    }
-
-    private LogSegmentEntryStore getWriterEntryStore() {
-        if (null == writerEntryStore) {
-            writerEntryStore = new BKLogSegmentEntryStore(
-                    conf,
-                    dynConf,
-                    writerZKC,
-                    writerBKC,
-                    scheduler,
-                    allocator,
-                    statsLogger,
-                    failureInjector);
-        }
-        return writerEntryStore;
-    }
-
-    private LogSegmentEntryStore getReaderEntryStore() {
-        if (null == readerEntryStore) {
-            readerEntryStore = new BKLogSegmentEntryStore(
-                    conf,
-                    dynConf,
-                    writerZKC,
-                    readerBKC,
-                    scheduler,
-                    allocator,
-                    statsLogger,
-                    failureInjector);
-        }
-        return readerEntryStore;
-    }
-
-    @Override
-    public AccessControlManager getAccessControlManager() throws IOException {
-        if (null == accessControlManager) {
-            String aclRootPath = getBkdlConfig().getACLRootPath();
-            // Build the access control manager
-            if (aclRootPath == null) {
-                accessControlManager = DefaultAccessControlManager.INSTANCE;
-                LOG.info("Created default access control manager for {}", namespace);
-            } else {
-                if (!isReservedStreamName(aclRootPath)) {
-                    throw new IOException("Invalid Access Control List Root Path : " + aclRootPath);
-                }
-                String zkRootPath = namespace.getPath() + "/" + aclRootPath;
-                LOG.info("Creating zk based access control manager @ {} for {}",
-                        zkRootPath, namespace);
-                accessControlManager = new ZKAccessControlManager(conf, readerZKC,
-                        zkRootPath, scheduler);
-                LOG.info("Created zk based access control manager @ {} for {}",
-                        zkRootPath, namespace);
-            }
-        }
-        return accessControlManager;
-    }
-
-    @Override
-    public SubscriptionsStore getSubscriptionsStore(String streamName) {
-        return new ZKSubscriptionsStore(
-                writerZKC,
-                LogMetadataForReader.getSubscribersPath(namespace, streamName, conf.getUnpartitionedStreamName()));
-    }
-
-    //
-    // Legacy Intefaces
-    //
-
-    @Override
-    public MetadataAccessor getMetadataAccessor(String streamName)
-            throws InvalidStreamNameException, IOException {
-        if (getBkdlConfig().isFederatedNamespace()) {
-            throw new UnsupportedOperationException();
-        }
-        checkState();
-        validateName(streamName);
-        return new ZKMetadataAccessor(
-                streamName,
-                conf,
-                namespace,
-                sharedWriterZKCBuilder,
-                sharedReaderZKCBuilder,
-                statsLogger);
-    }
-
-    public Map<String, byte[]> enumerateLogsWithMetadataInNamespace()
-        throws IOException, IllegalArgumentException {
-        String namespaceRootPath = namespace.getPath();
-        HashMap<String, byte[]> result = new HashMap<String, byte[]>();
-        ZooKeeperClient zkc = writerZKC;
-        try {
-            ZooKeeper zk = Utils.sync(zkc, namespaceRootPath);
-            Stat currentStat = zk.exists(namespaceRootPath, false);
-            if (currentStat == null) {
-                return result;
-            }
-            List<String> children = zk.getChildren(namespaceRootPath, false);
-            for(String child: children) {
-                if (isReservedStreamName(child)) {
-                    continue;
-                }
-                String zkPath = String.format("%s/%s", namespaceRootPath, child);
-                currentStat = zk.exists(zkPath, false);
-                if (currentStat == null) {
-                    result.put(child, new byte[0]);
-                } else {
-                    result.put(child, zk.getData(zkPath, false, currentStat));
-                }
-            }
-        } catch (InterruptedException ie) {
-            LOG.error("Interrupted while deleting " + namespaceRootPath, ie);
-            throw new IOException("Interrupted while reading " + namespaceRootPath, ie);
-        } catch (KeeperException ke) {
-            LOG.error("Error reading" + namespaceRootPath + "entry in zookeeper", ke);
-            throw new IOException("Error reading" + namespaceRootPath + "entry in zookeeper", ke);
-        }
-        return result;
-    }
-
-    //
-    // Zk & Bk Utils
-    //
-
-    public static ZooKeeperClientBuilder createZKClientBuilder(String zkcName,
-                                                               DistributedLogConfiguration conf,
-                                                               String zkServers,
-                                                               StatsLogger statsLogger) {
-        RetryPolicy retryPolicy = null;
-        if (conf.getZKNumRetries() > 0) {
-            retryPolicy = new BoundExponentialBackoffRetryPolicy(
-                conf.getZKRetryBackoffStartMillis(),
-                conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries());
-        }
-        ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder()
-            .name(zkcName)
-            .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
-            .retryThreadCount(conf.getZKClientNumberRetryThreads())
-            .requestRateLimit(conf.getZKRequestRateLimit())
-            .zkServers(zkServers)
-            .retryPolicy(retryPolicy)
-            .statsLogger(statsLogger)
-            .zkAclId(conf.getZkAclId());
-        LOG.info("Created shared zooKeeper client builder {}: zkServers = {}, numRetries = {}, sessionTimeout = {}, retryBackoff = {},"
-                + " maxRetryBackoff = {}, zkAclId = {}.", new Object[] { zkcName, zkServers, conf.getZKNumRetries(),
-                conf.getZKSessionTimeoutMilliseconds(), conf.getZKRetryBackoffStartMillis(),
-                conf.getZKRetryBackoffMaxMillis(), conf.getZkAclId() });
-        return builder;
-    }
-
-    private BookKeeperClientBuilder createBKCBuilder(String bkcName,
-                                                     DistributedLogConfiguration conf,
-                                                     String zkServers,
-                                                     String ledgersPath,
-                                                     ClientSocketChannelFactory channelFactory,
-                                                     HashedWheelTimer requestTimer,
-                                                     Optional<FeatureProvider> featureProviderOptional,
-                                                     StatsLogger statsLogger) {
-        BookKeeperClientBuilder builder = BookKeeperClientBuilder.newBuilder()
-                .name(bkcName)
-                .dlConfig(conf)
-                .zkServers(zkServers)
-                .ledgersPath(ledgersPath)
-                .channelFactory(channelFactory)
-                .requestTimer(requestTimer)
-                .featureProvider(featureProviderOptional)
-                .statsLogger(statsLogger);
-        LOG.info("Created shared client builder {} : zkServers = {}, ledgersPath = {}, numIOThreads = {}",
-                new Object[] { bkcName, zkServers, ledgersPath, conf.getBKClientNumberIOThreads() });
-        return builder;
-    }
-
-    //
-    // Test Methods
-    //
-
-    @VisibleForTesting
-    public ZooKeeperClient getWriterZKC() {
-        return writerZKC;
-    }
-
-    @VisibleForTesting
-    public BookKeeperClient getReaderBKC() {
-        return readerBKC;
-    }
-
-    @VisibleForTesting
-    public AsyncFailureInjector getFailureInjector() {
-        return this.failureInjector;
-    }
-
-    @VisibleForTesting
-    public LogStreamMetadataStore getWriterStreamMetadataStore() {
-        return writerStreamMetadataStore;
-    }
-
-    @VisibleForTesting
-    public LedgerAllocator getLedgerAllocator() {
-        return allocator;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java
deleted file mode 100644
index 50b1405..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.callback.NamespaceListener;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.metadata.LogMetadataStore;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-import java.net.URI;
-import java.util.Iterator;
-import java.util.List;
-
-import static com.twitter.distributedlog.util.DLUtils.*;
-
-/**
- * ZooKeeper based log metadata store
- */
-public class ZKLogMetadataStore implements LogMetadataStore {
-
-    final URI namespace;
-    final Optional<URI> nsOptional;
-    final ZooKeeperClient zkc;
-    final ZKNamespaceWatcher nsWatcher;
-
-    public ZKLogMetadataStore(
-            DistributedLogConfiguration conf,
-            URI namespace,
-            ZooKeeperClient zkc,
-            OrderedScheduler scheduler) {
-        this.namespace = namespace;
-        this.nsOptional = Optional.of(this.namespace);
-        this.zkc = zkc;
-        this.nsWatcher = new ZKNamespaceWatcher(conf, namespace, zkc, scheduler);
-    }
-
-    @Override
-    public Future<URI> createLog(String logName) {
-        return Future.value(namespace);
-    }
-
-    @Override
-    public Future<Optional<URI>> getLogLocation(String logName) {
-        return Future.value(nsOptional);
-    }
-
-    @Override
-    public Future<Iterator<String>> getLogs() {
-        final Promise<Iterator<String>> promise = new Promise<Iterator<String>>();
-        final String nsRootPath = namespace.getPath();
-        try {
-            final ZooKeeper zk = zkc.get();
-            zk.sync(nsRootPath, new AsyncCallback.VoidCallback() {
-                @Override
-                public void processResult(int syncRc, String syncPath, Object ctx) {
-                    if (KeeperException.Code.OK.intValue() == syncRc) {
-                        zk.getChildren(nsRootPath, false, new AsyncCallback.Children2Callback() {
-                            @Override
-                            public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
-                                if (KeeperException.Code.OK.intValue() == rc) {
-                                    List<String> results = Lists.newArrayListWithExpectedSize(children.size());
-                                    for (String child : children) {
-                                        if (!isReservedStreamName(child)) {
-                                            results.add(child);
-                                        }
-                                    }
-                                    promise.setValue(results.iterator());
-                                } else if (KeeperException.Code.NONODE.intValue() == rc) {
-                                    List<String> streams = Lists.newLinkedList();
-                                    promise.setValue(streams.iterator());
-                                } else {
-                                    promise.setException(new ZKException("Error reading namespace " + nsRootPath,
-                                            KeeperException.Code.get(rc)));
-                                }
-                            }
-                        }, null);
-                    } else if (KeeperException.Code.NONODE.intValue() == syncRc) {
-                        List<String> streams = Lists.newLinkedList();
-                        promise.setValue(streams.iterator());
-                    } else {
-                        promise.setException(new ZKException("Error reading namespace " + nsRootPath,
-                                KeeperException.Code.get(syncRc)));
-                    }
-                }
-            }, null);
-            zkc.get();
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        } catch (InterruptedException e) {
-            promise.setException(e);
-        }
-        return promise;
-    }
-
-    @Override
-    public void registerNamespaceListener(NamespaceListener listener) {
-        this.nsWatcher.registerListener(listener);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentFilters.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentFilters.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentFilters.java
deleted file mode 100644
index e55b2f2..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentFilters.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl;
-
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.logsegment.LogSegmentFilter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Filters based on current zookeeper log segments.
- */
-public class ZKLogSegmentFilters {
-
-    static final Logger LOG = LoggerFactory.getLogger(ZKLogSegmentFilters.class);
-
-    /**
-     * Write handler filter should return all inprogress log segments and the last completed log segment.
-     * Because sequence id & ledger sequence number assignment rely on previous log segments.
-     */
-    public static final LogSegmentFilter WRITE_HANDLE_FILTER = new LogSegmentFilter() {
-        @Override
-        public Collection<String> filter(Collection<String> fullList) {
-            List<String> result = new ArrayList<String>(fullList.size());
-            String lastCompletedLogSegmentName = null;
-            long lastLogSegmentSequenceNumber = -1L;
-            for (String s : fullList) {
-                if (s.startsWith(DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX)) {
-                    result.add(s);
-                } else if (s.startsWith(DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX)) {
-                    String[] parts = s.split("_");
-                    try {
-                        if (2 == parts.length) {
-                            // name: logrecs_<logsegment_sequence_number>
-                            long logSegmentSequenceNumber = Long.parseLong(parts[1]);
-                            if (logSegmentSequenceNumber > lastLogSegmentSequenceNumber) {
-                                lastLogSegmentSequenceNumber = logSegmentSequenceNumber;
-                                lastCompletedLogSegmentName = s;
-                            }
-                        } else if (6 == parts.length) {
-                            // name: logrecs_<start_tx_id>_<end_tx_id>_<logsegment_sequence_number>_<ledger_id>_<region_id>
-                            long logSegmentSequenceNumber = Long.parseLong(parts[3]);
-                            if (logSegmentSequenceNumber > lastLogSegmentSequenceNumber) {
-                                lastLogSegmentSequenceNumber = logSegmentSequenceNumber;
-                                lastCompletedLogSegmentName = s;
-                            }
-                        } else {
-                            // name: logrecs_<start_tx_id>_<end_tx_id> or any unknown names
-                            // we don't know the ledger sequence from the name, so add it to the list
-                            result.add(s);
-                        }
-                    } catch (NumberFormatException nfe) {
-                        LOG.warn("Unexpected sequence number in log segment {} :", s, nfe);
-                        result.add(s);
-                    }
-                } else {
-                    LOG.error("Unknown log segment name : {}", s);
-                }
-            }
-            if (null != lastCompletedLogSegmentName) {
-                result.add(lastCompletedLogSegmentName);
-            }
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Filtered log segments {} from {}.", result, fullList);
-            }
-            return result;
-        }
-    };
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
deleted file mode 100644
index 2076dd8..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ /dev/null
@@ -1,503 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl;
-
-import com.google.common.collect.ImmutableList;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.callback.LogSegmentNamesListener;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.metadata.LogMetadata;
-import com.twitter.distributedlog.metadata.LogMetadataForWriter;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
-import com.twitter.distributedlog.util.DLUtils;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.distributedlog.util.Transaction.OpListener;
-import com.twitter.distributedlog.zk.DefaultZKOp;
-import com.twitter.distributedlog.zk.ZKOp;
-import com.twitter.distributedlog.zk.ZKTransaction;
-import com.twitter.distributedlog.zk.ZKVersionedSetOp;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.AsyncCallback.Children2Callback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * ZooKeeper based log segment metadata store.
- */
-public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watcher, Children2Callback {
-
-    private static final Logger logger = LoggerFactory.getLogger(ZKLogSegmentMetadataStore.class);
-
-    private static final List<String> EMPTY_LIST = ImmutableList.of();
-
-    private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<Versioned<List<String>>> {
-
-        private final String logSegmentsPath;
-        private final ZKLogSegmentMetadataStore store;
-        private int currentZKBackOffMs;
-
-        ReadLogSegmentsTask(String logSegmentsPath,
-                            ZKLogSegmentMetadataStore metadataStore) {
-            this.logSegmentsPath = logSegmentsPath;
-            this.store = metadataStore;
-            this.currentZKBackOffMs = store.minZKBackoffMs;
-        }
-
-        @Override
-        public void onSuccess(final Versioned<List<String>> segments) {
-            // reset the back off after a successful operation
-            currentZKBackOffMs = store.minZKBackoffMs;
-            store.notifyLogSegmentsUpdated(
-                    logSegmentsPath,
-                    store.listeners.get(logSegmentsPath),
-                    segments);
-        }
-
-        @Override
-        public void onFailure(Throwable cause) {
-            int backoffMs;
-            if (cause instanceof LogNotFoundException) {
-                // the log segment has been deleted, remove all the registered listeners
-                store.notifyLogStreamDeleted(logSegmentsPath,
-                        store.listeners.remove(logSegmentsPath));
-                return;
-            } else {
-                backoffMs = currentZKBackOffMs;
-                currentZKBackOffMs = Math.min(2 * currentZKBackOffMs, store.maxZKBackoffMs);
-            }
-            store.scheduleTask(logSegmentsPath, this, backoffMs);
-        }
-
-        @Override
-        public void run() {
-            if (null != store.listeners.get(logSegmentsPath)) {
-                store.zkGetLogSegmentNames(logSegmentsPath, store).addEventListener(this);
-            } else {
-                logger.debug("Log segments listener for {} has been removed.", logSegmentsPath);
-            }
-        }
-    }
-
-    /**
-     * A log segment names listener that keeps tracking the version of list of log segments that it has been notified.
-     * It only notify the newer log segments.
-     */
-    static class VersionedLogSegmentNamesListener {
-
-        private final LogSegmentNamesListener listener;
-        private Versioned<List<String>> lastNotifiedLogSegments;
-
-        VersionedLogSegmentNamesListener(LogSegmentNamesListener listener) {
-            this.listener = listener;
-            this.lastNotifiedLogSegments = new Versioned<List<String>>(EMPTY_LIST, Version.NEW);
-        }
-
-        synchronized void onSegmentsUpdated(Versioned<List<String>> logSegments) {
-            if (lastNotifiedLogSegments.getVersion() == Version.NEW ||
-                    lastNotifiedLogSegments.getVersion().compare(logSegments.getVersion()) == Version.Occurred.BEFORE) {
-                lastNotifiedLogSegments = logSegments;
-                listener.onSegmentsUpdated(logSegments);
-            }
-        }
-
-        @Override
-        public int hashCode() {
-            return listener.hashCode();
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (!(obj instanceof VersionedLogSegmentNamesListener)) {
-                return false;
-            }
-            VersionedLogSegmentNamesListener other = (VersionedLogSegmentNamesListener) obj;
-            return listener.equals(other.listener);
-        }
-
-        @Override
-        public String toString() {
-            return listener.toString();
-        }
-    }
-
-    final DistributedLogConfiguration conf;
-    // settings
-    final int minZKBackoffMs;
-    final int maxZKBackoffMs;
-    final boolean skipMinVersionCheck;
-
-    final ZooKeeperClient zkc;
-    // log segment listeners
-    final ConcurrentMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>> listeners;
-    // scheduler
-    final OrderedScheduler scheduler;
-    final ReentrantReadWriteLock closeLock;
-    boolean closed = false;
-
-    public ZKLogSegmentMetadataStore(DistributedLogConfiguration conf,
-                                     ZooKeeperClient zkc,
-                                     OrderedScheduler scheduler) {
-        this.conf = conf;
-        this.zkc = zkc;
-        this.listeners =
-                new ConcurrentHashMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>>();
-        this.scheduler = scheduler;
-        this.closeLock = new ReentrantReadWriteLock();
-        // settings
-        this.minZKBackoffMs = conf.getZKRetryBackoffStartMillis();
-        this.maxZKBackoffMs = conf.getZKRetryBackoffMaxMillis();
-        this.skipMinVersionCheck = conf.getDLLedgerMetadataSkipMinVersionCheck();
-    }
-
-    protected void scheduleTask(Object key, Runnable r, long delayMs) {
-        closeLock.readLock().lock();
-        try {
-            if (closed) {
-                return;
-            }
-            scheduler.schedule(key, r, delayMs, TimeUnit.MILLISECONDS);
-        } finally {
-            closeLock.readLock().unlock();
-        }
-    }
-
-    protected void submitTask(Object key, Runnable r) {
-        closeLock.readLock().lock();
-        try {
-            if (closed) {
-                return;
-            }
-            scheduler.submit(key, r);
-        } finally {
-            closeLock.readLock().unlock();
-        }
-    }
-
-    // max sequence number and max transaction id
-
-    @Override
-    public void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn,
-                                                 LogMetadata logMetadata,
-                                                 Versioned<Long> lssn,
-                                                 Transaction.OpListener<Version> listener) {
-        Version version = lssn.getVersion();
-        assert(version instanceof ZkVersion);
-        ZkVersion zkVersion = (ZkVersion) version;
-        byte[] data = DLUtils.serializeLogSegmentSequenceNumber(lssn.getValue());
-        Op setDataOp = Op.setData(logMetadata.getLogSegmentsPath(), data, zkVersion.getZnodeVersion());
-        ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener);
-        txn.addOp(zkOp);
-    }
-
-    @Override
-    public void storeMaxTxnId(Transaction<Object> txn,
-                              LogMetadataForWriter logMetadata,
-                              Versioned<Long> transactionId,
-                              Transaction.OpListener<Version> listener) {
-        Version version = transactionId.getVersion();
-        assert(version instanceof ZkVersion);
-        ZkVersion zkVersion = (ZkVersion) version;
-        byte[] data = DLUtils.serializeTransactionId(transactionId.getValue());
-        Op setDataOp = Op.setData(logMetadata.getMaxTxIdPath(), data, zkVersion.getZnodeVersion());
-        ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener);
-        txn.addOp(zkOp);
-    }
-
-    // updates
-
-    @Override
-    public Transaction<Object> transaction() {
-        return new ZKTransaction(zkc);
-    }
-
-    @Override
-    public void createLogSegment(Transaction<Object> txn,
-                                 LogSegmentMetadata segment,
-                                 OpListener<Void> listener) {
-        byte[] finalisedData = segment.getFinalisedData().getBytes(UTF_8);
-        Op createOp = Op.create(
-                segment.getZkPath(),
-                finalisedData,
-                zkc.getDefaultACL(),
-                CreateMode.PERSISTENT);
-        txn.addOp(DefaultZKOp.of(createOp, listener));
-    }
-
-    @Override
-    public void deleteLogSegment(Transaction<Object> txn,
-                                 final LogSegmentMetadata segment,
-                                 final OpListener<Void> listener) {
-        Op deleteOp = Op.delete(
-                segment.getZkPath(),
-                -1);
-        logger.info("Delete segment : {}", segment);
-        txn.addOp(DefaultZKOp.of(deleteOp, new OpListener<Void>() {
-            @Override
-            public void onCommit(Void r) {
-                if (null != listener) {
-                    listener.onCommit(r);
-                }
-            }
-
-            @Override
-            public void onAbort(Throwable t) {
-                logger.info("Aborted transaction on deleting segment {}", segment);
-                KeeperException.Code kc;
-                if (t instanceof KeeperException) {
-                    kc = ((KeeperException) t).code();
-                } else if (t instanceof ZKException) {
-                    kc = ((ZKException) t).getKeeperExceptionCode();
-                } else {
-                    abortListener(t);
-                    return;
-                }
-                if (KeeperException.Code.NONODE == kc) {
-                    abortListener(new LogSegmentNotFoundException(segment.getZkPath()));
-                    return;
-                }
-                abortListener(t);
-            }
-
-            private void abortListener(Throwable t) {
-                if (null != listener) {
-                    listener.onAbort(t);
-                }
-            }
-        }));
-    }
-
-    @Override
-    public void updateLogSegment(Transaction<Object> txn, LogSegmentMetadata segment) {
-        byte[] finalisedData = segment.getFinalisedData().getBytes(UTF_8);
-        Op updateOp = Op.setData(segment.getZkPath(), finalisedData, -1);
-        txn.addOp(DefaultZKOp.of(updateOp, null));
-    }
-
-    // reads
-
-    /**
-     * Process the watched events for registered listeners
-     */
-    @Override
-    public void process(WatchedEvent event) {
-        if (Event.EventType.None == event.getType()
-                && Event.KeeperState.Expired == event.getState()) {
-            Set<String> keySet = new HashSet<String>(listeners.keySet());
-            for (String logSegmentsPath : keySet) {
-                scheduleTask(logSegmentsPath, new ReadLogSegmentsTask(logSegmentsPath, this), 0L);
-            }
-            return;
-        }
-        String path = event.getPath();
-        if (null == path) {
-            return;
-        }
-        switch (event.getType()) {
-            case NodeDeleted:
-                notifyLogStreamDeleted(path, listeners.remove(path));
-                break;
-            case NodeChildrenChanged:
-                new ReadLogSegmentsTask(path, this).run();
-                break;
-            default:
-                break;
-        }
-    }
-
-    @Override
-    public Future<LogSegmentMetadata> getLogSegment(String logSegmentPath) {
-        return LogSegmentMetadata.read(zkc, logSegmentPath, skipMinVersionCheck);
-    }
-
-    Future<Versioned<List<String>>> zkGetLogSegmentNames(String logSegmentsPath, Watcher watcher) {
-        Promise<Versioned<List<String>>> result = new Promise<Versioned<List<String>>>();
-        try {
-            zkc.get().getChildren(logSegmentsPath, watcher, this, result);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            result.setException(FutureUtils.zkException(e, logSegmentsPath));
-        } catch (InterruptedException e) {
-            result.setException(FutureUtils.zkException(e, logSegmentsPath));
-        }
-        return result;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
-        Promise<Versioned<List<String>>> result = ((Promise<Versioned<List<String>>>) ctx);
-        if (KeeperException.Code.OK.intValue() == rc) {
-            /** cversion: the number of changes to the children of this znode **/
-            ZkVersion zkVersion = new ZkVersion(stat.getCversion());
-            result.setValue(new Versioned(children, zkVersion));
-        } else if (KeeperException.Code.NONODE.intValue() == rc) {
-            result.setException(new LogNotFoundException("Log " + path + " not found"));
-        } else {
-            result.setException(new ZKException("Failed to get log segments from " + path,
-                    KeeperException.Code.get(rc)));
-        }
-    }
-
-    @Override
-    public Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath,
-                                                              LogSegmentNamesListener listener) {
-        Watcher zkWatcher;
-        if (null == listener) {
-            zkWatcher = null;
-        } else {
-            closeLock.readLock().lock();
-            try {
-                if (closed) {
-                    zkWatcher = null;
-                } else {
-                    Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
-                            listeners.get(logSegmentsPath);
-                    if (null == listenerSet) {
-                        Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> newListenerSet =
-                                new HashMap<LogSegmentNamesListener, VersionedLogSegmentNamesListener>();
-                        Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> oldListenerSet =
-                                listeners.putIfAbsent(logSegmentsPath, newListenerSet);
-                        if (null != oldListenerSet) {
-                            listenerSet = oldListenerSet;
-                        } else {
-                            listenerSet = newListenerSet;
-                        }
-                    }
-                    synchronized (listenerSet) {
-                        listenerSet.put(listener, new VersionedLogSegmentNamesListener(listener));
-                        if (!listeners.containsKey(logSegmentsPath)) {
-                            // listener set has been removed, add it back
-                            if (null != listeners.putIfAbsent(logSegmentsPath, listenerSet)) {
-                                logger.debug("Listener set is already found for log segments path {}", logSegmentsPath);
-                            }
-                        }
-                    }
-                    zkWatcher = ZKLogSegmentMetadataStore.this;
-                }
-            } finally {
-                closeLock.readLock().unlock();
-            }
-        }
-        Future<Versioned<List<String>>> getLogSegmentNamesResult = zkGetLogSegmentNames(logSegmentsPath, zkWatcher);
-        if (null != listener) {
-            getLogSegmentNamesResult.addEventListener(new ReadLogSegmentsTask(logSegmentsPath, this));
-        }
-        return zkGetLogSegmentNames(logSegmentsPath, zkWatcher);
-    }
-
-    @Override
-    public void unregisterLogSegmentListener(String logSegmentsPath,
-                                             LogSegmentNamesListener listener) {
-        closeLock.readLock().lock();
-        try {
-            if (closed) {
-                return;
-            }
-            Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
-                    listeners.get(logSegmentsPath);
-            if (null == listenerSet) {
-                return;
-            }
-            synchronized (listenerSet) {
-                listenerSet.remove(listener);
-                if (listenerSet.isEmpty()) {
-                    listeners.remove(logSegmentsPath, listenerSet);
-                }
-            }
-        } finally {
-            closeLock.readLock().unlock();
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        closeLock.writeLock().lock();
-        try {
-            if (closed) {
-                return;
-            }
-            closed = true;
-        } finally {
-            closeLock.writeLock().unlock();
-        }
-    }
-
-    // Notifications
-
-    void notifyLogStreamDeleted(String logSegmentsPath,
-                                final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listeners) {
-        if (null == listeners) {
-            return;
-        }
-        this.submitTask(logSegmentsPath, new Runnable() {
-            @Override
-            public void run() {
-                for (LogSegmentNamesListener listener : listeners.keySet()) {
-                    listener.onLogStreamDeleted();
-                }
-            }
-        });
-
-    }
-
-    void notifyLogSegmentsUpdated(String logSegmentsPath,
-                                  final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listeners,
-                                  final Versioned<List<String>> segments) {
-        if (null == listeners) {
-            return;
-        }
-        this.submitTask(logSegmentsPath, new Runnable() {
-            @Override
-            public void run() {
-                for (VersionedLogSegmentNamesListener listener : listeners.values()) {
-                    listener.onSegmentsUpdated(segments);
-                }
-            }
-        });
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java
deleted file mode 100644
index eeda804..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl;
-
-import java.io.IOException;
-import java.net.URI;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.MetadataAccessor;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.ZooKeeperClientBuilder;
-import com.twitter.distributedlog.exceptions.AlreadyClosedException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.impl.metadata.BKDLConfig;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.RetryPolicy;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.twitter.distributedlog.impl.BKNamespaceDriver.getZKServersFromDLUri;
-
-public class ZKMetadataAccessor implements MetadataAccessor {
-    static final Logger LOG = LoggerFactory.getLogger(ZKMetadataAccessor.class);
-    protected final String name;
-    protected Promise<Void> closePromise;
-    protected final URI uri;
-    // zookeeper clients
-    // NOTE: The actual zookeeper client is initialized lazily when it is referenced by
-    //       {@link com.twitter.distributedlog.ZooKeeperClient#get()}. So it is safe to
-    //       keep builders and their client wrappers here, as they will be used when
-    //       instantiating readers or writers.
-    protected final ZooKeeperClientBuilder writerZKCBuilder;
-    protected final ZooKeeperClient writerZKC;
-    protected final boolean ownWriterZKC;
-    protected final ZooKeeperClientBuilder readerZKCBuilder;
-    protected final ZooKeeperClient readerZKC;
-    protected final boolean ownReaderZKC;
-
-    ZKMetadataAccessor(String name,
-                       DistributedLogConfiguration conf,
-                       URI uri,
-                       ZooKeeperClientBuilder writerZKCBuilder,
-                       ZooKeeperClientBuilder readerZKCBuilder,
-                       StatsLogger statsLogger) {
-        this.name = name;
-        this.uri = uri;
-
-        if (null == writerZKCBuilder) {
-            RetryPolicy retryPolicy = null;
-            if (conf.getZKNumRetries() > 0) {
-                retryPolicy = new BoundExponentialBackoffRetryPolicy(
-                    conf.getZKRetryBackoffStartMillis(),
-                    conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries());
-            }
-            this.writerZKCBuilder = ZooKeeperClientBuilder.newBuilder()
-                    .name(String.format("dlzk:%s:dlm_writer_shared", name))
-                    .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
-                    .retryThreadCount(conf.getZKClientNumberRetryThreads())
-                    .requestRateLimit(conf.getZKRequestRateLimit())
-                    .zkAclId(conf.getZkAclId())
-                    .uri(uri)
-                    .retryPolicy(retryPolicy)
-                    .statsLogger(statsLogger.scope("dlzk_dlm_writer_shared"));
-            this.ownWriterZKC = true;
-        } else {
-            this.writerZKCBuilder = writerZKCBuilder;
-            this.ownWriterZKC = false;
-        }
-        this.writerZKC = this.writerZKCBuilder.build();
-
-        if (null == readerZKCBuilder) {
-            String zkServersForWriter = getZKServersFromDLUri(uri);
-            String zkServersForReader;
-            try {
-                BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(this.writerZKC, uri);
-                zkServersForReader = bkdlConfig.getDlZkServersForReader();
-            } catch (IOException e) {
-                LOG.warn("Error on resolving dl metadata bindings for {} : ", uri, e);
-                zkServersForReader = zkServersForWriter;
-            }
-            if (zkServersForReader.equals(zkServersForWriter)) {
-                LOG.info("Used same zookeeper servers '{}' for both writers and readers for {}.",
-                         zkServersForWriter, name);
-                this.readerZKCBuilder = this.writerZKCBuilder;
-                this.ownReaderZKC = false;
-            } else {
-                RetryPolicy retryPolicy = null;
-                if (conf.getZKNumRetries() > 0) {
-                    retryPolicy = new BoundExponentialBackoffRetryPolicy(
-                        conf.getZKRetryBackoffStartMillis(),
-                        conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries());
-                }
-                this.readerZKCBuilder = ZooKeeperClientBuilder.newBuilder()
-                        .name(String.format("dlzk:%s:dlm_reader_shared", name))
-                        .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
-                        .retryThreadCount(conf.getZKClientNumberRetryThreads())
-                        .requestRateLimit(conf.getZKRequestRateLimit())
-                        .zkServers(zkServersForReader)
-                        .retryPolicy(retryPolicy)
-                        .zkAclId(conf.getZkAclId())
-                        .statsLogger(statsLogger.scope("dlzk_dlm_reader_shared"));
-                this.ownReaderZKC = true;
-            }
-        } else {
-            this.readerZKCBuilder = readerZKCBuilder;
-            this.ownReaderZKC = false;
-        }
-        this.readerZKC = this.readerZKCBuilder.build();
-    }
-
-    /**
-     * Get the name of the stream managed by this log manager
-     *
-     * @return streamName
-     */
-    @Override
-    public String getStreamName() {
-        return name;
-    }
-
-    /**
-     * Creates or update the metadata stored at the node associated with the
-     * name and URI
-     * @param metadata opaque metadata to be stored for the node
-     * @throws IOException
-     */
-    @Override
-    public void createOrUpdateMetadata(byte[] metadata) throws IOException {
-        checkClosedOrInError("createOrUpdateMetadata");
-
-        String zkPath = getZKPath();
-        LOG.debug("Setting application specific metadata on {}", zkPath);
-        try {
-            Stat currentStat = writerZKC.get().exists(zkPath, false);
-            if (currentStat == null) {
-                if (metadata.length > 0) {
-                    Utils.zkCreateFullPathOptimistic(writerZKC,
-                            zkPath,
-                            metadata,
-                            writerZKC.getDefaultACL(),
-                            CreateMode.PERSISTENT);
-                }
-            } else {
-                writerZKC.get().setData(zkPath, metadata, currentStat.getVersion());
-            }
-        } catch (InterruptedException ie) {
-            throw new DLInterruptedException("Interrupted on creating or updating container metadata", ie);
-        } catch (Exception exc) {
-            throw new IOException("Exception creating or updating container metadata", exc);
-        }
-    }
-
-    /**
-     * Delete the metadata stored at the associated node. This only deletes the metadata
-     * and not the node itself
-     * @throws IOException
-     */
-    @Override
-    public void deleteMetadata() throws IOException {
-        checkClosedOrInError("createOrUpdateMetadata");
-        createOrUpdateMetadata(null);
-    }
-
-    /**
-     * Retrieve the metadata stored at the node
-     * @return byte array containing the metadata
-     * @throws IOException
-     */
-    @Override
-    public byte[] getMetadata() throws IOException {
-        checkClosedOrInError("createOrUpdateMetadata");
-        String zkPath = getZKPath();
-        LOG.debug("Getting application specific metadata from {}", zkPath);
-        try {
-            Stat currentStat = readerZKC.get().exists(zkPath, false);
-            if (currentStat == null) {
-                return null;
-            } else {
-                return readerZKC.get().getData(zkPath, false, currentStat);
-            }
-        } catch (InterruptedException ie) {
-            throw new DLInterruptedException("Error reading the max tx id from zk", ie);
-        } catch (Exception e) {
-            throw new IOException("Error reading the max tx id from zk", e);
-        }
-    }
-
-    /**
-     * Close the metadata accessor, freeing any resources it may hold.
-     * @return future represents the close result.
-     */
-    @Override
-    public Future<Void> asyncClose() {
-        Promise<Void> closeFuture;
-        synchronized (this) {
-            if (null != closePromise) {
-                return closePromise;
-            }
-            closeFuture = closePromise = new Promise<Void>();
-        }
-        // NOTE: ownWriterZKC and ownReaderZKC are mostly used by tests
-        //       the managers created by the namespace - whose zkc will be closed by namespace
-        try {
-            if (ownWriterZKC) {
-                writerZKC.close();
-            }
-            if (ownReaderZKC) {
-                readerZKC.close();
-            }
-        } catch (Exception e) {
-            LOG.warn("Exception while closing distributed log manager", e);
-        }
-        FutureUtils.setValue(closeFuture, null);
-        return closeFuture;
-    }
-
-    @Override
-    public void close() throws IOException {
-        FutureUtils.result(asyncClose());
-    }
-
-    public synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException {
-        if (null != closePromise) {
-            throw new AlreadyClosedException("Executing " + operation + " on already closed ZKMetadataAccessor");
-        }
-    }
-
-    protected String getZKPath() {
-        return String.format("%s/%s", uri.getPath(), name);
-    }
-
-    @VisibleForTesting
-    protected ZooKeeperClient getReaderZKC() {
-        return readerZKC;
-    }
-
-    @VisibleForTesting
-    protected ZooKeeperClient getWriterZKC() {
-        return writerZKC;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java
deleted file mode 100644
index 06bc8fb..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.callback.NamespaceListener;
-import com.twitter.distributedlog.namespace.NamespaceWatcher;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static com.twitter.distributedlog.util.DLUtils.*;
-
-/**
- * Watcher on watching a given namespace
- */
-public class ZKNamespaceWatcher extends NamespaceWatcher
-        implements Runnable, Watcher, AsyncCallback.Children2Callback {
-
-    static final Logger logger = LoggerFactory.getLogger(ZKNamespaceWatcher.class);
-
-    private final DistributedLogConfiguration conf;
-    private final URI uri;
-    private final ZooKeeperClient zkc;
-    private final OrderedScheduler scheduler;
-    private final AtomicBoolean namespaceWatcherSet = new AtomicBoolean(false);
-
-    public ZKNamespaceWatcher(DistributedLogConfiguration conf,
-                              URI uri,
-                              ZooKeeperClient zkc,
-                              OrderedScheduler scheduler) {
-        this.conf = conf;
-        this.uri = uri;
-        this.zkc = zkc;
-        this.scheduler = scheduler;
-    }
-
-    private void scheduleTask(Runnable r, long ms) {
-        try {
-            scheduler.schedule(r, ms, TimeUnit.MILLISECONDS);
-        } catch (RejectedExecutionException ree) {
-            logger.error("Task {} scheduled in {} ms is rejected : ", new Object[]{r, ms, ree});
-        }
-    }
-
-    @Override
-    public void run() {
-        try {
-            doWatchNamespaceChanges();
-        } catch (Exception e) {
-            logger.error("Encountered unknown exception on watching namespace {} ", uri, e);
-        }
-    }
-
-    public void watchNamespaceChanges() {
-        if (!namespaceWatcherSet.compareAndSet(false, true)) {
-            return;
-        }
-        doWatchNamespaceChanges();
-    }
-
-    private void doWatchNamespaceChanges() {
-        try {
-            zkc.get().getChildren(uri.getPath(), this, this, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            scheduleTask(this, conf.getZKSessionTimeoutMilliseconds());
-        } catch (InterruptedException e) {
-            logger.warn("Interrupted on watching namespace changes for {} : ", uri, e);
-            scheduleTask(this, conf.getZKSessionTimeoutMilliseconds());
-        }
-    }
-
-    @Override
-    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
-        if (KeeperException.Code.OK.intValue() == rc) {
-            logger.info("Received updated logs under {} : {}", uri, children);
-            List<String> result = new ArrayList<String>(children.size());
-            for (String s : children) {
-                if (isReservedStreamName(s)) {
-                    continue;
-                }
-                result.add(s);
-            }
-            for (NamespaceListener listener : listeners) {
-                listener.onStreamsChanged(result.iterator());
-            }
-        } else {
-            scheduleTask(this, conf.getZKSessionTimeoutMilliseconds());
-        }
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-        if (event.getType() == Event.EventType.None) {
-            if (event.getState() == Event.KeeperState.Expired) {
-                scheduleTask(this, conf.getZKSessionTimeoutMilliseconds());
-            }
-            return;
-        }
-        if (event.getType() == Event.EventType.NodeChildrenChanged) {
-            // watch namespace changes again.
-            doWatchNamespaceChanges();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java
deleted file mode 100644
index 8126723..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.acl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.thrift.AccessControlEntry;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.apache.thrift.transport.TMemoryBuffer;
-import org.apache.thrift.transport.TMemoryInputTransport;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-public class ZKAccessControl {
-
-    private static final int BUFFER_SIZE = 4096;
-
-    public static final AccessControlEntry DEFAULT_ACCESS_CONTROL_ENTRY = new AccessControlEntry();
-
-    public static class CorruptedAccessControlException extends IOException {
-
-        private static final long serialVersionUID = 5391285182476211603L;
-
-        public CorruptedAccessControlException(String zkPath, Throwable t) {
-            super("Access Control @ " + zkPath + " is corrupted.", t);
-        }
-    }
-
-    protected final AccessControlEntry accessControlEntry;
-    protected final String zkPath;
-    private int zkVersion;
-
-    public ZKAccessControl(AccessControlEntry ace, String zkPath) {
-        this(ace, zkPath, -1);
-    }
-
-    private ZKAccessControl(AccessControlEntry ace, String zkPath, int zkVersion) {
-        this.accessControlEntry = ace;
-        this.zkPath = zkPath;
-        this.zkVersion = zkVersion;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(zkPath, accessControlEntry);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (!(obj instanceof ZKAccessControl)) {
-            return false;
-        }
-        ZKAccessControl other = (ZKAccessControl) obj;
-        return Objects.equal(zkPath, other.zkPath) &&
-                Objects.equal(accessControlEntry, other.accessControlEntry);
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("entry(path=").append(zkPath).append(", acl=")
-                .append(accessControlEntry).append(")");
-        return sb.toString();
-    }
-
-    @VisibleForTesting
-    public String getZKPath() {
-        return zkPath;
-    }
-
-    @VisibleForTesting
-    public AccessControlEntry getAccessControlEntry() {
-        return accessControlEntry;
-    }
-
-    public Future<ZKAccessControl> create(ZooKeeperClient zkc) {
-        final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
-        try {
-            zkc.get().create(zkPath, serialize(accessControlEntry), zkc.getDefaultACL(), CreateMode.PERSISTENT,
-                    new AsyncCallback.StringCallback() {
-                        @Override
-                        public void processResult(int rc, String path, Object ctx, String name) {
-                            if (KeeperException.Code.OK.intValue() == rc) {
-                                ZKAccessControl.this.zkVersion = 0;
-                                promise.setValue(ZKAccessControl.this);
-                            } else {
-                                promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
-                            }
-                        }
-                    }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        } catch (InterruptedException e) {
-            promise.setException(e);
-        } catch (IOException e) {
-            promise.setException(e);
-        }
-        return promise;
-    }
-
-    public Future<ZKAccessControl> update(ZooKeeperClient zkc) {
-        final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
-        try {
-            zkc.get().setData(zkPath, serialize(accessControlEntry), zkVersion, new AsyncCallback.StatCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx, Stat stat) {
-                    if (KeeperException.Code.OK.intValue() == rc) {
-                        ZKAccessControl.this.zkVersion = stat.getVersion();
-                        promise.setValue(ZKAccessControl.this);
-                    } else {
-                        promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
-                    }
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        } catch (InterruptedException e) {
-            promise.setException(e);
-        } catch (IOException e) {
-            promise.setException(e);
-        }
-        return promise;
-    }
-
-    public static Future<ZKAccessControl> read(final ZooKeeperClient zkc, final String zkPath, Watcher watcher) {
-        final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
-
-        try {
-            zkc.get().getData(zkPath, watcher, new AsyncCallback.DataCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-                    if (KeeperException.Code.OK.intValue() == rc) {
-                        try {
-                            AccessControlEntry ace = deserialize(zkPath, data);
-                            promise.setValue(new ZKAccessControl(ace, zkPath, stat.getVersion()));
-                        } catch (IOException ioe) {
-                            promise.setException(ioe);
-                        }
-                    } else {
-                        promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
-                    }
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        } catch (InterruptedException e) {
-            promise.setException(e);
-        }
-        return promise;
-    }
-
-    public static Future<Void> delete(final ZooKeeperClient zkc, final String zkPath) {
-        final Promise<Void> promise = new Promise<Void>();
-
-        try {
-            zkc.get().delete(zkPath, -1, new AsyncCallback.VoidCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx) {
-                    if (KeeperException.Code.OK.intValue() == rc ||
-                            KeeperException.Code.NONODE.intValue() == rc) {
-                        promise.setValue(null);
-                    } else {
-                        promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
-                    }
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        } catch (InterruptedException e) {
-            promise.setException(e);
-        }
-        return promise;
-    }
-
-    static byte[] serialize(AccessControlEntry ace) throws IOException {
-        TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
-        TJSONProtocol protocol = new TJSONProtocol(transport);
-        try {
-            ace.write(protocol);
-            transport.flush();
-            return transport.toString(UTF_8.name()).getBytes(UTF_8);
-        } catch (TException e) {
-            throw new IOException("Failed to serialize access control entry : ", e);
-        } catch (UnsupportedEncodingException uee) {
-            throw new IOException("Failed to serialize acesss control entry : ", uee);
-        }
-    }
-
-    static AccessControlEntry deserialize(String zkPath, byte[] data) throws IOException {
-        if (data.length == 0) {
-            return DEFAULT_ACCESS_CONTROL_ENTRY;
-        }
-
-        AccessControlEntry ace = new AccessControlEntry();
-        TMemoryInputTransport transport = new TMemoryInputTransport(data);
-        TJSONProtocol protocol = new TJSONProtocol(transport);
-        try {
-            ace.read(protocol);
-        } catch (TException e) {
-            throw new CorruptedAccessControlException(zkPath, e);
-        }
-        return ace;
-    }
-
-}