You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:32 UTC

[27/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
deleted file mode 100644
index 74cd6cf..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
+++ /dev/null
@@ -1,402 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.util.FailpointUtils;
-import com.twitter.distributedlog.zk.ZKWatcherManager;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.RetryPolicy;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * ZooKeeper Client wrapper over {@link org.apache.bookkeeper.zookeeper.ZooKeeperClient}.
- * It handles retries on session expires and provides a watcher manager {@link ZKWatcherManager}.
- *
- * <h3>Metrics</h3>
- * <ul>
- * <li> zookeeper operation stats are exposed under scope <code>zk</code> by
- * {@link org.apache.bookkeeper.zookeeper.ZooKeeperClient}
- * <li> stats on zookeeper watched events are exposed under scope <code>watcher</code> by
- * {@link org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase}
- * <li> stats about {@link ZKWatcherManager} are exposed under scope <code>watcher_manager</code>
- * </ul>
- */
-public class ZooKeeperClient {
-
-    public static interface Credentials {
-
-        Credentials NONE = new Credentials() {
-            @Override
-            public void authenticate(ZooKeeper zooKeeper) {
-                // noop
-            }
-        };
-
-        void authenticate(ZooKeeper zooKeeper);
-    }
-
-    public static class DigestCredentials implements Credentials {
-
-        String username;
-        String password;
-
-        public DigestCredentials(String username, String password) {
-            this.username = username;
-            this.password = password;
-        }
-
-        @Override
-        public void authenticate(ZooKeeper zooKeeper) {
-            zooKeeper.addAuthInfo("digest", String.format("%s:%s", username, password).getBytes(UTF_8));
-        }
-    }
-
-    public interface ZooKeeperSessionExpireNotifier {
-        void notifySessionExpired();
-    }
-
-    /**
-     * Indicates an error connecting to a zookeeper cluster.
-     */
-    public static class ZooKeeperConnectionException extends IOException {
-        private static final long serialVersionUID = 6682391687004819361L;
-
-        public ZooKeeperConnectionException(String message) {
-            super(message);
-        }
-
-        public ZooKeeperConnectionException(String message, Throwable cause) {
-            super(message, cause);
-        }
-    }
-
-    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class.getName());
-
-    private final String name;
-    private final int sessionTimeoutMs;
-    private final int defaultConnectionTimeoutMs;
-    private final String zooKeeperServers;
-    // GuardedBy "this", but still volatile for tests, where we want to be able to see writes
-    // made from within long synchronized blocks.
-    private volatile ZooKeeper zooKeeper = null;
-    private final RetryPolicy retryPolicy;
-    private final StatsLogger statsLogger;
-    private final int retryThreadCount;
-    private final double requestRateLimit;
-    private final Credentials credentials;
-    private volatile boolean authenticated = false;
-    private Stopwatch disconnectedStopwatch = null;
-
-    private boolean closed = false;
-
-    final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>();
-
-    // watcher manager to manage watchers
-    private final ZKWatcherManager watcherManager;
-
-    /**
-     * Creates an unconnected client that will lazily attempt to connect on the first call to
-     * {@link #get}.  All successful connections will be authenticated with the given
-     * {@code credentials}.
-     *
-     * @param sessionTimeoutMs
-     *          ZK session timeout in milliseconds
-     * @param connectionTimeoutMs
-     *          ZK connection timeout in milliseconds
-     * @param zooKeeperServers
-     *          the set of servers forming the ZK cluster
-     */
-    ZooKeeperClient(int sessionTimeoutMs, int connectionTimeoutMs, String zooKeeperServers) {
-        this("default", sessionTimeoutMs, connectionTimeoutMs, zooKeeperServers, null, NullStatsLogger.INSTANCE, 1, 0,
-             Credentials.NONE);
-    }
-
-    ZooKeeperClient(String name,
-                    int sessionTimeoutMs,
-                    int connectionTimeoutMs,
-                    String zooKeeperServers,
-                    RetryPolicy retryPolicy,
-                    StatsLogger statsLogger,
-                    int retryThreadCount,
-                    double requestRateLimit,
-                    Credentials credentials) {
-        this.name = name;
-        this.sessionTimeoutMs = sessionTimeoutMs;
-        this.zooKeeperServers = zooKeeperServers;
-        this.defaultConnectionTimeoutMs = connectionTimeoutMs;
-        this.retryPolicy = retryPolicy;
-        this.statsLogger = statsLogger;
-        this.retryThreadCount = retryThreadCount;
-        this.requestRateLimit = requestRateLimit;
-        this.credentials = credentials;
-        this.watcherManager = ZKWatcherManager.newBuilder()
-                .name(name)
-                .zkc(this)
-                .statsLogger(statsLogger.scope("watcher_manager"))
-                .build();
-    }
-
-    public List<ACL> getDefaultACL() {
-        if (Credentials.NONE == credentials) {
-            return ZooDefs.Ids.OPEN_ACL_UNSAFE;
-        } else {
-            return DistributedLogConstants.EVERYONE_READ_CREATOR_ALL;
-        }
-    }
-
-    public ZKWatcherManager getWatcherManager() {
-        return watcherManager;
-    }
-
-    /**
-     * Returns the current active ZK connection or establishes a new one if none has yet been
-     * established or a previous connection was disconnected or had its session time out.
-     *
-     * @return a connected ZooKeeper client
-     * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
-     * @throws InterruptedException if interrupted while waiting for a connection to be established
-     * @throws TimeoutException if a connection could not be established within the configured
-     * session timeout
-     */
-    public synchronized ZooKeeper get()
-        throws ZooKeeperConnectionException, InterruptedException {
-
-        try {
-            FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_ZooKeeperConnectionLoss);
-        } catch (IOException ioe) {
-            throw new ZooKeeperConnectionException("Client " + name + " failed on establishing zookeeper connection", ioe);
-        }
-
-        // This indicates that the client was explictly closed
-        if (closed) {
-            throw new ZooKeeperConnectionException("Client " + name + " has already been closed");
-        }
-
-        // the underneath zookeeper is retryable zookeeper
-        if (zooKeeper != null && retryPolicy != null) {
-            if (zooKeeper.getState().equals(ZooKeeper.States.CONNECTED)) {
-                // the zookeeper client is connected
-                disconnectedStopwatch = null;
-            } else {
-                if (disconnectedStopwatch == null) {
-                    disconnectedStopwatch = Stopwatch.createStarted();
-                } else {
-                    long disconnectedMs = disconnectedStopwatch.elapsed(TimeUnit.MILLISECONDS);
-                    if (disconnectedMs > defaultConnectionTimeoutMs) {
-                        closeInternal();
-                        authenticated = false;
-                    }
-                }
-            }
-        }
-
-        if (zooKeeper == null) {
-            zooKeeper = buildZooKeeper();
-            disconnectedStopwatch = null;
-        }
-
-        // In case authenticate throws an exception, the caller can try to recover the client by
-        // calling get again.
-        if (!authenticated) {
-            credentials.authenticate(zooKeeper);
-            authenticated = true;
-        }
-
-        return zooKeeper;
-    }
-
-    private ZooKeeper buildZooKeeper()
-        throws ZooKeeperConnectionException, InterruptedException {
-        Watcher watcher = new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-                switch (event.getType()) {
-                    case None:
-                        switch (event.getState()) {
-                            case Expired:
-                                if (null == retryPolicy) {
-                                    LOG.info("ZooKeeper {}' session expired. Event: {}", name, event);
-                                    closeInternal();
-                                }
-                                authenticated = false;
-                                break;
-                            case Disconnected:
-                                if (null == retryPolicy) {
-                                    LOG.info("ZooKeeper {} is disconnected from zookeeper now," +
-                                            " but it is OK unless we received EXPIRED event.", name);
-                                }
-                                // Mark as not authenticated if expired or disconnected. In both cases
-                                // we lose any attached auth info. Relying on Expired/Disconnected is
-                                // sufficient since all Expired/Disconnected events are processed before
-                                // all SyncConnected events, and the underlying member is not updated until
-                                // SyncConnected is received.
-                                authenticated = false;
-                                break;
-                            default:
-                                break;
-                        }
-                }
-
-                try {
-                    for (Watcher watcher : watchers) {
-                        try {
-                            watcher.process(event);
-                        } catch (Throwable t) {
-                            LOG.warn("Encountered unexpected exception from watcher {} : ", watcher, t);
-                        }
-                    }
-                } catch (Throwable t) {
-                    LOG.warn("Encountered unexpected exception when firing watched event {} : ", event, t);
-                }
-            }
-        };
-
-        Set<Watcher> watchers = new HashSet<Watcher>();
-        watchers.add(watcher);
-
-        ZooKeeper zk;
-        try {
-            RetryPolicy opRetryPolicy = null == retryPolicy ?
-                    new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0) : retryPolicy;
-            RetryPolicy connectRetryPolicy = null == retryPolicy ?
-                    new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0) :
-                    new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE);
-            zk = org.apache.bookkeeper.zookeeper.ZooKeeperClient.newBuilder()
-                    .connectString(zooKeeperServers)
-                    .sessionTimeoutMs(sessionTimeoutMs)
-                    .watchers(watchers)
-                    .operationRetryPolicy(opRetryPolicy)
-                    .connectRetryPolicy(connectRetryPolicy)
-                    .statsLogger(statsLogger)
-                    .retryThreadCount(retryThreadCount)
-                    .requestRateLimit(requestRateLimit)
-                    .build();
-        } catch (KeeperException e) {
-            throw new ZooKeeperConnectionException("Problem connecting to servers: " + zooKeeperServers, e);
-        } catch (IOException e) {
-            throw new ZooKeeperConnectionException("Problem connecting to servers: " + zooKeeperServers, e);
-        }
-        return zk;
-    }
-
-    /**
-     * Clients that need to re-establish state after session expiration can register an
-     * {@code onExpired} command to execute.
-     *
-     * @param onExpired the {@code Command} to register
-     * @return the new {@link Watcher} which can later be passed to {@link #unregister} for
-     *         removal.
-     */
-    public Watcher registerExpirationHandler(final ZooKeeperSessionExpireNotifier onExpired) {
-        Watcher watcher = new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-                if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) {
-                    try {
-                        onExpired.notifySessionExpired();
-                    } catch (Exception exc) {
-                        // do nothing
-                    }
-                }
-            }
-        };
-        register(watcher);
-        return watcher;
-    }
-
-    /**
-     * Clients that need to register a top-level {@code Watcher} should do so using this method.  The
-     * registered {@code watcher} will remain registered across re-connects and session expiration
-     * events.
-     *
-     * @param watcher the {@code Watcher to register}
-     */
-    public void register(Watcher watcher) {
-        if (null != watcher) {
-            watchers.add(watcher);
-        }
-    }
-
-    /**
-     * Clients can attempt to unregister a top-level {@code Watcher} that has previously been
-     * registered.
-     *
-     * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch
-     * @return whether the given {@code Watcher} was found and removed from the active set
-     */
-    public boolean unregister(Watcher watcher) {
-        return null != watcher && watchers.remove(watcher);
-    }
-
-    /**
-     * Closes the current connection if any expiring the current ZooKeeper session.  Any subsequent
-     * calls to this method will no-op until the next successful {@link #get}.
-     */
-    public synchronized void closeInternal() {
-        if (zooKeeper != null) {
-            try {
-                LOG.info("Closing zookeeper client {}.", name);
-                zooKeeper.close();
-                LOG.info("Closed zookeeper client {}.", name);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                LOG.warn("Interrupted trying to close zooKeeper {} : ", name, e);
-            } finally {
-                zooKeeper = null;
-            }
-        }
-    }
-
-    /**
-     * Closes the the underlying zookeeper instance.
-     * Subsequent attempts to {@link #get} will fail
-     */
-    public synchronized void close() {
-        if (closed) {
-            return;
-        }
-        LOG.info("Close zookeeper client {}.", name);
-        closeInternal();
-        // unregister gauges to prevent GC spiral
-        this.watcherManager.unregisterGauges();
-        closed = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java
deleted file mode 100644
index 15f1805..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Preconditions;
-import com.twitter.distributedlog.ZooKeeperClient.Credentials;
-import com.twitter.distributedlog.ZooKeeperClient.DigestCredentials;
-import com.twitter.distributedlog.impl.BKNamespaceDriver;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.zookeeper.RetryPolicy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-
-/**
- * Builder to build zookeeper client.
- */
-public class ZooKeeperClientBuilder {
-
-    static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClientBuilder.class);
-
-    /**
-     * Create a zookeeper client builder to build zookeeper clients.
-     *
-     * @return zookeeper client builder.
-     */
-    public static ZooKeeperClientBuilder newBuilder() {
-        return new ZooKeeperClientBuilder();
-    }
-
-    // name
-    private String name = "default";
-    // sessionTimeoutMs
-    private int sessionTimeoutMs = -1;
-    // conectionTimeoutMs
-    private int conectionTimeoutMs = -1;
-    // zkServers
-    private String zkServers = null;
-    // retry policy
-    private RetryPolicy retryPolicy = null;
-    // stats logger
-    private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
-    // retry executor thread count
-    private int retryThreadCount = 1;
-    // zookeeper access requestRateLimit limit
-    private double requestRateLimit = 0;
-    // Did call the zkAclId setter on the builder, used to ensure the setter is set.
-    private boolean zkAclIdSet = false;
-    private String zkAclId;
-
-    // Cached ZooKeeper Client
-    private ZooKeeperClient cachedClient = null;
-
-    private ZooKeeperClientBuilder() {}
-
-    /**
-     * Set zookeeper client name
-     *
-     * @param name zookeeper client name
-     * @return zookeeper client builder
-     */
-    public synchronized ZooKeeperClientBuilder name(String name) {
-        this.name = name;
-        return this;
-    }
-
-    /**
-     * Set zookeeper session timeout in milliseconds.
-     *
-     * @param sessionTimeoutMs
-     *          session timeout in milliseconds.
-     * @return zookeeper client builder.
-     */
-    public synchronized ZooKeeperClientBuilder sessionTimeoutMs(int sessionTimeoutMs) {
-        this.sessionTimeoutMs = sessionTimeoutMs;
-        if (this.conectionTimeoutMs <= 0) {
-            this.conectionTimeoutMs = 2 * sessionTimeoutMs;
-        }
-        return this;
-    }
-
-    public synchronized ZooKeeperClientBuilder retryThreadCount(int retryThreadCount) {
-        this.retryThreadCount = retryThreadCount;
-        return this;
-    }
-
-    public synchronized ZooKeeperClientBuilder requestRateLimit(double requestRateLimit) {
-        this.requestRateLimit = requestRateLimit;
-        return this;
-    }
-
-    /**
-     * Set zookeeper connection timeout in milliseconds
-     *
-     * @param connectionTimeoutMs
-     *          connection timeout ms.
-     * @return builder
-     */
-    public synchronized ZooKeeperClientBuilder connectionTimeoutMs(int connectionTimeoutMs) {
-        this.conectionTimeoutMs = connectionTimeoutMs;
-        return this;
-    }
-
-    /**
-     * Set ZooKeeper Connect String.
-     *
-     * @param zkServers
-     *          zookeeper servers to connect.
-     * @return builder
-     */
-    public synchronized ZooKeeperClientBuilder zkServers(String zkServers) {
-        this.zkServers = zkServers;
-        return this;
-    }
-
-    /**
-     * Set DistributedLog URI.
-     *
-     * @param uri
-     *          distributedlog uri.
-     * @return builder.
-     */
-    public synchronized ZooKeeperClientBuilder uri(URI uri) {
-        this.zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri);
-        return this;
-    }
-
-    /**
-     * Build zookeeper client using existing <i>zkc</i> client.
-     *
-     * @param zkc
-     *          zookeeper client.
-     * @return builder
-     */
-    public synchronized ZooKeeperClientBuilder zkc(ZooKeeperClient zkc) {
-        this.cachedClient = zkc;
-        return this;
-    }
-
-    /**
-     * Build zookeeper client with given retry policy <i>retryPolicy</i>.
-     *
-     * @param retryPolicy
-     *          retry policy
-     * @return builder
-     */
-    public synchronized ZooKeeperClientBuilder retryPolicy(RetryPolicy retryPolicy) {
-        this.retryPolicy = retryPolicy;
-        return this;
-    }
-
-    /**
-     * Build zookeeper client with given stats logger <i>statsLogger</i>.
-     *
-     * @param statsLogger
-     *          stats logger to expose zookeeper stats
-     * @return builder
-     */
-    public synchronized ZooKeeperClientBuilder statsLogger(StatsLogger statsLogger) {
-        this.statsLogger = statsLogger;
-        return this;
-    }
-
-    /**
-     * * Build zookeeper client with given zk acl digest id <i>zkAclId</i>.
-     */
-    public synchronized ZooKeeperClientBuilder zkAclId(String zkAclId) {
-        this.zkAclIdSet = true;
-        this.zkAclId = zkAclId;
-        return this;
-    }
-
-    private void validateParameters() {
-        Preconditions.checkNotNull(zkServers, "No zk servers provided.");
-        Preconditions.checkArgument(conectionTimeoutMs > 0,
-                "Invalid connection timeout : %d", conectionTimeoutMs);
-        Preconditions.checkArgument(sessionTimeoutMs > 0,
-                "Invalid session timeout : %d", sessionTimeoutMs);
-        Preconditions.checkNotNull(statsLogger, "No stats logger provided.");
-        Preconditions.checkArgument(zkAclIdSet, "Zookeeper acl id not set.");
-    }
-
-    /**
-     * Build a zookeeper client.
-     *
-     * @return zookeeper client.
-     */
-    public synchronized ZooKeeperClient build() {
-        if (null == cachedClient) {
-            cachedClient = buildClient();
-        }
-        return cachedClient;
-    }
-
-    private ZooKeeperClient buildClient() {
-        validateParameters();
-
-        Credentials credentials = Credentials.NONE;
-        if (null != zkAclId) {
-            credentials = new DigestCredentials(zkAclId, zkAclId);
-        }
-
-        return new ZooKeeperClient(
-                name,
-                sessionTimeoutMs,
-                conectionTimeoutMs,
-                zkServers,
-                retryPolicy,
-                statsLogger,
-                retryThreadCount,
-                requestRateLimit,
-                credentials
-        );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/AccessControlManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/AccessControlManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/AccessControlManager.java
deleted file mode 100644
index 5fcc87e..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/AccessControlManager.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.acl;
-
-/**
- * Access Control on stream operations
- */
-public interface AccessControlManager {
-
-    /**
-     * Whether allowing writing to a stream.
-     *
-     * @param stream
-     *          Stream to write
-     * @return true if allowing writing to the given stream, otherwise false.
-     */
-    boolean allowWrite(String stream);
-
-    /**
-     * Whether allowing truncating a given stream.
-     *
-     * @param stream
-     *          Stream to truncate
-     * @return true if allowing truncating a given stream.
-     */
-    boolean allowTruncate(String stream);
-
-    /**
-     * Whether allowing deleting a given stream.
-     *
-     * @param stream
-     *          Stream to delete
-     * @return true if allowing deleting a given stream.
-     */
-    boolean allowDelete(String stream);
-
-    /**
-     * Whether allowing proxies to acquire a given stream.
-     *
-     * @param stream
-     *          stream to acquire
-     * @return true if allowing proxies to acquire the given stream.
-     */
-    boolean allowAcquire(String stream);
-
-    /**
-     * Whether allowing proxies to release ownership for a given stream.
-     *
-     * @param stream
-     *          stream to release
-     * @return true if allowing proxies to release a given stream.
-     */
-    boolean allowRelease(String stream);
-
-    /**
-     * Close the access control manager.
-     */
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/DefaultAccessControlManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/DefaultAccessControlManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/DefaultAccessControlManager.java
deleted file mode 100644
index e757595..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/DefaultAccessControlManager.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.acl;
-
-public class DefaultAccessControlManager implements AccessControlManager {
-
-    public static final DefaultAccessControlManager INSTANCE = new DefaultAccessControlManager();
-
-    private DefaultAccessControlManager() {
-    }
-
-    @Override
-    public boolean allowWrite(String stream) {
-        return true;
-    }
-
-    @Override
-    public boolean allowTruncate(String stream) {
-        return true;
-    }
-
-    @Override
-    public boolean allowDelete(String stream) {
-        return true;
-    }
-
-    @Override
-    public boolean allowAcquire(String stream) {
-        return true;
-    }
-
-    @Override
-    public boolean allowRelease(String stream) {
-        return true;
-    }
-
-    @Override
-    public void close() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/package-info.java
deleted file mode 100644
index 65109fc..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Access Control for distributedlog streams.
- */
-package com.twitter.distributedlog.acl;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
deleted file mode 100644
index 0512907..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
+++ /dev/null
@@ -1,921 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.admin;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.LogRecordWithDLSN;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.ReadUtils;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.ZooKeeperClientBuilder;
-import com.twitter.distributedlog.impl.BKNamespaceDriver;
-import com.twitter.distributedlog.impl.acl.ZKAccessControl;
-import com.twitter.distributedlog.exceptions.DLIllegalStateException;
-import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
-import com.twitter.distributedlog.impl.metadata.BKDLConfig;
-import com.twitter.distributedlog.metadata.DLMetadata;
-import com.twitter.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
-import com.twitter.distributedlog.metadata.MetadataUpdater;
-import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.namespace.NamespaceDriver;
-import com.twitter.distributedlog.thrift.AccessControlEntry;
-import com.twitter.distributedlog.tools.DistributedLogTool;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.SchedulerUtils;
-import com.twitter.util.Await;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.util.IOUtils;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Admin Tool for DistributedLog.
- */
-public class DistributedLogAdmin extends DistributedLogTool {
-
-    static final Logger LOG = LoggerFactory.getLogger(DistributedLogAdmin.class);
-
-    /**
-     * Fix inprogress segment with lower ledger sequence number.
-     *
-     * @param namespace
-     *          dl namespace
-     * @param metadataUpdater
-     *          metadata updater.
-     * @param streamName
-     *          stream name.
-     * @param verbose
-     *          print verbose messages.
-     * @param interactive
-     *          is confirmation needed before executing actual action.
-     * @throws IOException
-     */
-    public static void fixInprogressSegmentWithLowerSequenceNumber(final DistributedLogNamespace namespace,
-                                                                   final MetadataUpdater metadataUpdater,
-                                                                   final String streamName,
-                                                                   final boolean verbose,
-                                                                   final boolean interactive) throws IOException {
-        DistributedLogManager dlm = namespace.openLog(streamName);
-        try {
-            List<LogSegmentMetadata> segments = dlm.getLogSegments();
-            if (verbose) {
-                System.out.println("LogSegments for " + streamName + " : ");
-                for (LogSegmentMetadata segment : segments) {
-                    System.out.println(segment.getLogSegmentSequenceNumber() + "\t: " + segment);
-                }
-            }
-            LOG.info("Get log segments for {} : {}", streamName, segments);
-            // validate log segments
-            long maxCompletedLogSegmentSequenceNumber = -1L;
-            LogSegmentMetadata inprogressSegment = null;
-            for (LogSegmentMetadata segment : segments) {
-                if (!segment.isInProgress()) {
-                    maxCompletedLogSegmentSequenceNumber = Math.max(maxCompletedLogSegmentSequenceNumber, segment.getLogSegmentSequenceNumber());
-                } else {
-                    // we already found an inprogress segment
-                    if (null != inprogressSegment) {
-                        throw new DLIllegalStateException("Multiple inprogress segments found for stream " + streamName + " : " + segments);
-                    }
-                    inprogressSegment = segment;
-                }
-            }
-            if (null == inprogressSegment || inprogressSegment.getLogSegmentSequenceNumber() > maxCompletedLogSegmentSequenceNumber) {
-                // nothing to fix
-                return;
-            }
-            final long newLogSegmentSequenceNumber = maxCompletedLogSegmentSequenceNumber + 1;
-            if (interactive && !IOUtils.confirmPrompt("Confirm to fix (Y/N), Ctrl+C to break : ")) {
-                return;
-            }
-            final LogSegmentMetadata newSegment =
-                    FutureUtils.result(metadataUpdater.changeSequenceNumber(inprogressSegment, newLogSegmentSequenceNumber));
-            LOG.info("Fixed {} : {} -> {} ",
-                     new Object[] { streamName, inprogressSegment, newSegment });
-            if (verbose) {
-                System.out.println("Fixed " + streamName + " : " + inprogressSegment.getZNodeName()
-                                   + " -> " + newSegment.getZNodeName());
-                System.out.println("\t old: " + inprogressSegment);
-                System.out.println("\t new: " + newSegment);
-                System.out.println();
-            }
-        } finally {
-            dlm.close();
-        }
-    }
-
-    private static class LogSegmentCandidate {
-        final LogSegmentMetadata metadata;
-        final LogRecordWithDLSN lastRecord;
-
-        LogSegmentCandidate(LogSegmentMetadata metadata, LogRecordWithDLSN lastRecord) {
-            this.metadata = metadata;
-            this.lastRecord = lastRecord;
-        }
-
-        @Override
-        public String toString() {
-            return "LogSegmentCandidate[ metadata = " + metadata + ", last record = " + lastRecord + " ]";
-        }
-
-    }
-
-    private static final Comparator<LogSegmentCandidate> LOG_SEGMENT_CANDIDATE_COMPARATOR =
-            new Comparator<LogSegmentCandidate>() {
-                @Override
-                public int compare(LogSegmentCandidate o1, LogSegmentCandidate o2) {
-                    return LogSegmentMetadata.COMPARATOR.compare(o1.metadata, o2.metadata);
-                }
-            };
-
-    private static class StreamCandidate {
-
-        final String streamName;
-        final SortedSet<LogSegmentCandidate> segmentCandidates =
-                new TreeSet<LogSegmentCandidate>(LOG_SEGMENT_CANDIDATE_COMPARATOR);
-
-        StreamCandidate(String streamName) {
-            this.streamName = streamName;
-        }
-
-        synchronized void addLogSegmentCandidate(LogSegmentCandidate segmentCandidate) {
-            segmentCandidates.add(segmentCandidate);
-        }
-
-        @Override
-        public String toString() {
-            return "StreamCandidate[ name = " + streamName + ", segments = " + segmentCandidates + " ]";
-        }
-    }
-
-    public static void checkAndRepairDLNamespace(final URI uri,
-                                                 final DistributedLogNamespace namespace,
-                                                 final MetadataUpdater metadataUpdater,
-                                                 final OrderedScheduler scheduler,
-                                                 final boolean verbose,
-                                                 final boolean interactive) throws IOException {
-        checkAndRepairDLNamespace(uri, namespace, metadataUpdater, scheduler, verbose, interactive, 1);
-    }
-
-    public static void checkAndRepairDLNamespace(final URI uri,
-                                                 final DistributedLogNamespace namespace,
-                                                 final MetadataUpdater metadataUpdater,
-                                                 final OrderedScheduler scheduler,
-                                                 final boolean verbose,
-                                                 final boolean interactive,
-                                                 final int concurrency) throws IOException {
-        Preconditions.checkArgument(concurrency > 0, "Invalid concurrency " + concurrency + " found.");
-        // 0. getting streams under a given uri.
-        Iterator<String> streamsIter = namespace.getLogs();
-        List<String> streams = Lists.newArrayList();
-        while (streamsIter.hasNext()) {
-            streams.add(streamsIter.next());
-        }
-        if (verbose) {
-            System.out.println("- 0. checking streams under " + uri);
-        }
-        if (streams.size() == 0) {
-            System.out.println("+ 0. nothing to check. quit.");
-            return;
-        }
-        Map<String, StreamCandidate> streamCandidates =
-                checkStreams(namespace, streams, scheduler, concurrency);
-        if (verbose) {
-            System.out.println("+ 0. " + streamCandidates.size() + " corrupted streams found.");
-        }
-        if (interactive && !IOUtils.confirmPrompt("Do you want to fix all " + streamCandidates.size() + " corrupted streams (Y/N) : ")) {
-            return;
-        }
-        if (verbose) {
-            System.out.println("- 1. repairing " + streamCandidates.size() + " corrupted streams.");
-        }
-        for (StreamCandidate candidate : streamCandidates.values()) {
-            if (!repairStream(metadataUpdater, candidate, verbose, interactive)) {
-                if (verbose) {
-                    System.out.println("* 1. aborted repairing corrupted streams.");
-                }
-                return;
-            }
-        }
-        if (verbose) {
-            System.out.println("+ 1. repaired " + streamCandidates.size() + " corrupted streams.");
-        }
-    }
-
-    private static Map<String, StreamCandidate> checkStreams(
-            final DistributedLogNamespace namespace,
-            final Collection<String> streams,
-            final OrderedScheduler scheduler,
-            final int concurrency) throws IOException {
-        final LinkedBlockingQueue<String> streamQueue =
-                new LinkedBlockingQueue<String>();
-        streamQueue.addAll(streams);
-        final Map<String, StreamCandidate> candidateMap =
-                new ConcurrentSkipListMap<String, StreamCandidate>();
-        final AtomicInteger numPendingStreams = new AtomicInteger(streams.size());
-        final CountDownLatch doneLatch = new CountDownLatch(1);
-        Runnable checkRunnable = new Runnable() {
-            @Override
-            public void run() {
-                while (!streamQueue.isEmpty()) {
-                    String stream;
-                    try {
-                        stream = streamQueue.take();
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        break;
-                    }
-                    StreamCandidate candidate;
-                    try {
-                        LOG.info("Checking stream {}.", stream);
-                        candidate = checkStream(namespace, stream, scheduler);
-                        LOG.info("Checked stream {} - {}.", stream, candidate);
-                    } catch (IOException e) {
-                        LOG.error("Error on checking stream {} : ", stream, e);
-                        doneLatch.countDown();
-                        break;
-                    }
-                    if (null != candidate) {
-                        candidateMap.put(stream, candidate);
-                    }
-                    if (numPendingStreams.decrementAndGet() == 0) {
-                        doneLatch.countDown();
-                    }
-                }
-            }
-        };
-        Thread[] threads = new Thread[concurrency];
-        for (int i = 0; i < concurrency; i++) {
-            threads[i] = new Thread(checkRunnable, "check-thread-" + i);
-            threads[i].start();
-        }
-        try {
-            doneLatch.await();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-        if (numPendingStreams.get() != 0) {
-            throw new IOException(numPendingStreams.get() + " streams left w/o checked");
-        }
-        for (int i = 0; i < concurrency; i++) {
-            threads[i].interrupt();
-            try {
-                threads[i].join();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            }
-        }
-        return candidateMap;
-    }
-
-    private static StreamCandidate checkStream(
-            final DistributedLogNamespace namespace,
-            final String streamName,
-            final OrderedScheduler scheduler) throws IOException {
-        DistributedLogManager dlm = namespace.openLog(streamName);
-        try {
-            List<LogSegmentMetadata> segments = dlm.getLogSegments();
-            if (segments.isEmpty()) {
-                return null;
-            }
-            List<Future<LogSegmentCandidate>> futures =
-                    new ArrayList<Future<LogSegmentCandidate>>(segments.size());
-            for (LogSegmentMetadata segment : segments) {
-                futures.add(checkLogSegment(namespace, streamName, segment, scheduler));
-            }
-            List<LogSegmentCandidate> segmentCandidates;
-            try {
-                segmentCandidates = Await.result(Future.collect(futures));
-            } catch (Exception e) {
-                throw new IOException("Failed on checking stream " + streamName, e);
-            }
-            StreamCandidate streamCandidate = new StreamCandidate(streamName);
-            for (LogSegmentCandidate segmentCandidate: segmentCandidates) {
-                if (null != segmentCandidate) {
-                    streamCandidate.addLogSegmentCandidate(segmentCandidate);
-                }
-            }
-            if (streamCandidate.segmentCandidates.isEmpty()) {
-                return null;
-            }
-            return streamCandidate;
-        } finally {
-            dlm.close();
-        }
-    }
-
-    private static Future<LogSegmentCandidate> checkLogSegment(
-            final DistributedLogNamespace namespace,
-            final String streamName,
-            final LogSegmentMetadata metadata,
-            final OrderedScheduler scheduler) {
-        if (metadata.isInProgress()) {
-            return Future.value(null);
-        }
-
-        final LogSegmentEntryStore entryStore = namespace.getNamespaceDriver()
-                .getLogSegmentEntryStore(NamespaceDriver.Role.READER);
-        return ReadUtils.asyncReadLastRecord(
-                streamName,
-                metadata,
-                true,
-                false,
-                true,
-                4,
-                16,
-                new AtomicInteger(0),
-                scheduler,
-                entryStore
-        ).map(new Function<LogRecordWithDLSN, LogSegmentCandidate>() {
-            @Override
-            public LogSegmentCandidate apply(LogRecordWithDLSN record) {
-                if (null != record &&
-                    (record.getDlsn().compareTo(metadata.getLastDLSN()) > 0 ||
-                     record.getTransactionId() > metadata.getLastTxId() ||
-                     !metadata.isRecordPositionWithinSegmentScope(record))) {
-                    return new LogSegmentCandidate(metadata, record);
-                } else {
-                    return null;
-                }
-            }
-        });
-    }
-
-    private static boolean repairStream(MetadataUpdater metadataUpdater,
-                                        StreamCandidate streamCandidate,
-                                        boolean verbose,
-                                        boolean interactive) throws IOException {
-        if (verbose) {
-            System.out.println("Stream " + streamCandidate.streamName + " : ");
-            for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) {
-                System.out.println("  " + segmentCandidate.metadata.getLogSegmentSequenceNumber()
-                        + " : metadata = " + segmentCandidate.metadata + ", last dlsn = "
-                        + segmentCandidate.lastRecord.getDlsn());
-            }
-            System.out.println("-------------------------------------------");
-        }
-        if (interactive && !IOUtils.confirmPrompt("Do you want to fix the stream " + streamCandidate.streamName + " (Y/N) : ")) {
-            return false;
-        }
-        for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) {
-            LogSegmentMetadata newMetadata = FutureUtils.result(
-                    metadataUpdater.updateLastRecord(segmentCandidate.metadata, segmentCandidate.lastRecord));
-            if (verbose) {
-                System.out.println("  Fixed segment " + segmentCandidate.metadata.getLogSegmentSequenceNumber() + " : ");
-                System.out.println("    old metadata : " + segmentCandidate.metadata);
-                System.out.println("    new metadata : " + newMetadata);
-            }
-        }
-        if (verbose) {
-            System.out.println("-------------------------------------------");
-        }
-        return true;
-    }
-
-    //
-    // Commands
-    //
-
-    /**
-     * Unbind the bookkeeper environment for a given distributedlog uri.
-     *
-     * TODO: move unbind operation to namespace driver
-     */
-    class UnbindCommand extends OptsCommand {
-
-        Options options = new Options();
-
-        UnbindCommand() {
-            super("unbind", "unbind the bookkeeper environment bound for a given distributedlog instance.");
-            options.addOption("f", "force", false, "Force unbinding without prompt.");
-        }
-
-        @Override
-        protected Options getOptions() {
-            return options;
-        }
-
-        @Override
-        protected String getUsage() {
-            return "unbind [options] <distributedlog uri>";
-        }
-
-        @Override
-        protected int runCmd(CommandLine cmdline) throws Exception {
-            String[] args = cmdline.getArgs();
-            if (args.length <= 0) {
-                System.err.println("No distributedlog uri specified.");
-                printUsage();
-                return -1;
-            }
-            boolean force = cmdline.hasOption("f");
-            URI uri = URI.create(args[0]);
-            // resolving the uri to see if there is another bindings in this uri.
-            ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri)
-                    .sessionTimeoutMs(10000).build();
-            BKDLConfig bkdlConfig;
-            try {
-                bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
-            } catch (IOException ie) {
-                bkdlConfig = null;
-            }
-            if (null == bkdlConfig) {
-                System.out.println("No bookkeeper is bound to " + uri);
-                return 0;
-            } else {
-                System.out.println("There is bookkeeper bound to " + uri + " : ");
-                System.out.println("");
-                System.out.println(bkdlConfig.toString());
-                System.out.println("");
-                if (!force && !IOUtils.confirmPrompt("Do you want to unbind " + uri + " :\n")) {
-                    return 0;
-                }
-            }
-            DLMetadata.unbind(uri);
-            System.out.println("Unbound on " + uri + ".");
-            return 0;
-        }
-    }
-
-    /**
-     * Bind Command to bind bookkeeper environment for a given distributed uri.
-     *
-     * TODO: move bind to namespace driver
-     */
-    class BindCommand extends OptsCommand {
-
-        Options options = new Options();
-
-        BindCommand() {
-            super("bind", "bind the bookkeeper environment settings for a given distributedlog instance.");
-            options.addOption("l", "bkLedgers", true, "ZooKeeper ledgers path for bookkeeper instance.");
-            options.addOption("s", "bkZkServers", true, "ZooKeeper servers used for bookkeeper for writers.");
-            options.addOption("bkzr", "bkZkServersForReader", true, "ZooKeeper servers used for bookkeeper for readers.");
-            options.addOption("dlzw", "dlZkServersForWriter", true, "ZooKeeper servers used for distributedlog for writers.");
-            options.addOption("dlzr", "dlZkServersForReader", true, "ZooKeeper servers used for distributedlog for readers.");
-            options.addOption("i", "sanityCheckTxnID", true, "Flag to sanity check highest txn id.");
-            options.addOption("r", "encodeRegionID", true, "Flag to encode region id.");
-            options.addOption("seqno", "firstLogSegmentSeqNo", true, "The first log segment sequence number to use after upgrade");
-            options.addOption("fns", "federatedNamespace", false, "Flag to turn a namespace to federated namespace");
-            options.addOption("f", "force", false, "Force binding without prompt.");
-            options.addOption("c", "creation", false, "Whether is it a creation binding.");
-            options.addOption("q", "query", false, "Query the bookkeeper bindings");
-        }
-
-        @Override
-        protected Options getOptions() {
-            return options;
-        }
-
-        @Override
-        protected String getUsage() {
-            return "bind [options] <distributedlog uri>";
-        }
-
-        @Override
-        protected int runCmd(CommandLine cmdline) throws Exception {
-            boolean isQuery = cmdline.hasOption("q");
-            if (!isQuery && (!cmdline.hasOption("l") || !cmdline.hasOption("s"))) {
-                System.err.println("Error: Neither zkServers nor ledgersPath specified for bookkeeper environment.");
-                printUsage();
-                return -1;
-            }
-            String[] args = cmdline.getArgs();
-            if (args.length <= 0) {
-                System.err.println("No distributedlog uri specified.");
-                printUsage();
-                return -1;
-            }
-            boolean force = cmdline.hasOption("f");
-            boolean creation = cmdline.hasOption("c");
-            String bkLedgersPath = cmdline.getOptionValue("l");
-            String bkZkServersForWriter = cmdline.getOptionValue("s");
-            boolean sanityCheckTxnID =
-                    !cmdline.hasOption("i") || Boolean.parseBoolean(cmdline.getOptionValue("i"));
-            boolean encodeRegionID =
-                    cmdline.hasOption("r") && Boolean.parseBoolean(cmdline.getOptionValue("r"));
-
-            String bkZkServersForReader;
-            if (cmdline.hasOption("bkzr")) {
-                bkZkServersForReader = cmdline.getOptionValue("bkzr");
-            } else {
-                bkZkServersForReader = bkZkServersForWriter;
-            }
-
-            URI uri = URI.create(args[0]);
-
-            String dlZkServersForWriter;
-            String dlZkServersForReader;
-            if (cmdline.hasOption("dlzw")) {
-                dlZkServersForWriter = cmdline.getOptionValue("dlzw");
-            } else {
-                dlZkServersForWriter = BKNamespaceDriver.getZKServersFromDLUri(uri);
-            }
-            if (cmdline.hasOption("dlzr")) {
-                dlZkServersForReader = cmdline.getOptionValue("dlzr");
-            } else {
-                dlZkServersForReader = dlZkServersForWriter;
-            }
-
-            // resolving the uri to see if there is another bindings in this uri.
-            ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri).zkAclId(null)
-                    .sessionTimeoutMs(10000).build();
-            try {
-                BKDLConfig newBKDLConfig =
-                        new BKDLConfig(dlZkServersForWriter, dlZkServersForReader,
-                                       bkZkServersForWriter, bkZkServersForReader, bkLedgersPath)
-                                .setSanityCheckTxnID(sanityCheckTxnID)
-                                .setEncodeRegionID(encodeRegionID);
-
-                if (cmdline.hasOption("seqno")) {
-                    newBKDLConfig = newBKDLConfig.setFirstLogSegmentSeqNo(Long.parseLong(cmdline.getOptionValue("seqno")));
-                }
-
-                if (cmdline.hasOption("fns")) {
-                    newBKDLConfig = newBKDLConfig.setFederatedNamespace(true);
-                }
-
-                BKDLConfig bkdlConfig;
-                try {
-                    bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
-                } catch (IOException ie) {
-                    bkdlConfig = null;
-                }
-                if (null == bkdlConfig) {
-                    System.out.println("No bookkeeper is bound to " + uri);
-                } else {
-                    System.out.println("There is bookkeeper bound to " + uri + " : ");
-                    System.out.println("");
-                    System.out.println(bkdlConfig.toString());
-                    System.out.println("");
-                    if (!isQuery) {
-                        if (newBKDLConfig.equals(bkdlConfig)) {
-                            System.out.println("No bookkeeper binding needs to be updated. Quit.");
-                            return 0;
-                        } else if(!newBKDLConfig.isFederatedNamespace() && bkdlConfig.isFederatedNamespace()) {
-                            System.out.println("You can't turn a federated namespace back to non-federated.");
-                            return 0;
-                        } else {
-                            if (!force && !IOUtils.confirmPrompt("Do you want to bind " + uri
-                                        + " with new bookkeeper instance :\n" + newBKDLConfig)) {
-                                return 0;
-                            }
-                        }
-                    }
-                }
-                if (isQuery) {
-                    System.out.println("Done.");
-                    return 0;
-                }
-                DLMetadata dlMetadata = DLMetadata.create(newBKDLConfig);
-                if (creation) {
-                    try {
-                        dlMetadata.create(uri);
-                        System.out.println("Created binding on " + uri + ".");
-                    } catch (IOException ie) {
-                        System.err.println("Failed to create binding on " + uri + " : " + ie.getMessage());
-                    }
-                } else {
-                    try {
-                        dlMetadata.update(uri);
-                        System.out.println("Updated binding on " + uri + " : ");
-                        System.out.println("");
-                        System.out.println(newBKDLConfig.toString());
-                        System.out.println("");
-                    } catch (IOException ie) {
-                        System.err.println("Failed to update binding on " + uri + " : " + ie.getMessage());
-                    }
-                }
-                if (newBKDLConfig.isFederatedNamespace()) {
-                    try {
-                        FederatedZKLogMetadataStore.createFederatedNamespace(uri, zkc);
-                    } catch (KeeperException.NodeExistsException nee) {
-                        // ignore node exists exception
-                    }
-                }
-                return 0;
-            } finally {
-                zkc.close();
-            }
-        }
-    }
-
-    static class RepairSeqNoCommand extends PerDLCommand {
-
-        boolean dryrun = false;
-        boolean verbose = false;
-        final List<String> streams = new ArrayList<String>();
-
-        RepairSeqNoCommand() {
-            super("repairseqno", "Repair a stream whose inprogress log segment has lower sequence number.");
-            options.addOption("d", "dryrun", false, "Dry run without repairing");
-            options.addOption("l", "list", true, "List of streams to repair, separated by comma");
-            options.addOption("v", "verbose", false, "Print verbose messages");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
-            super.parseCommandLine(cmdline);
-            dryrun = cmdline.hasOption("d");
-            verbose = cmdline.hasOption("v");
-            force = !dryrun && cmdline.hasOption("f");
-            if (!cmdline.hasOption("l")) {
-                throw new ParseException("No streams provided to repair");
-            }
-            String streamsList = cmdline.getOptionValue("l");
-            Collections.addAll(streams, streamsList.split(","));
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            MetadataUpdater metadataUpdater = dryrun ?
-                    new DryrunLogSegmentMetadataStoreUpdater(getConf(),
-                            getLogSegmentMetadataStore()) :
-                    LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(),
-                            getLogSegmentMetadataStore());
-            System.out.println("List of streams : ");
-            System.out.println(streams);
-            if (!IOUtils.confirmPrompt("Do you want to repair all these streams (Y/N):")) {
-                return -1;
-            }
-            for (String stream : streams) {
-                fixInprogressSegmentWithLowerSequenceNumber(getNamespace(), metadataUpdater, stream, verbose, !getForce());
-            }
-            return 0;
-        }
-
-        @Override
-        protected String getUsage() {
-            return "repairseqno [options]";
-        }
-    }
-
-    static class DLCKCommand extends PerDLCommand {
-
-        boolean dryrun = false;
-        boolean verbose = false;
-        int concurrency = 1;
-
-        DLCKCommand() {
-            super("dlck", "Check and repair a distributedlog namespace");
-            options.addOption("d", "dryrun", false, "Dry run without repairing");
-            options.addOption("v", "verbose", false, "Print verbose messages");
-            options.addOption("cy", "concurrency", true, "Concurrency on checking streams");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
-            super.parseCommandLine(cmdline);
-            dryrun = cmdline.hasOption("d");
-            verbose = cmdline.hasOption("v");
-            if (cmdline.hasOption("cy")) {
-                try {
-                    concurrency = Integer.parseInt(cmdline.getOptionValue("cy"));
-                } catch (NumberFormatException nfe) {
-                    throw new ParseException("Invalid concurrency value : " + cmdline.getOptionValue("cy"));
-                }
-            }
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            MetadataUpdater metadataUpdater = dryrun ?
-                    new DryrunLogSegmentMetadataStoreUpdater(getConf(),
-                            getLogSegmentMetadataStore()) :
-                    LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(),
-                            getLogSegmentMetadataStore());
-            OrderedScheduler scheduler = OrderedScheduler.newBuilder()
-                    .name("dlck-scheduler")
-                    .corePoolSize(Runtime.getRuntime().availableProcessors())
-                    .build();
-            ExecutorService executorService = Executors.newCachedThreadPool();
-            try {
-                checkAndRepairDLNamespace(getUri(), getNamespace(), metadataUpdater, scheduler,
-                                          verbose, !getForce(), concurrency);
-            } finally {
-                SchedulerUtils.shutdownScheduler(executorService, 5, TimeUnit.MINUTES);
-            }
-            return 0;
-        }
-
-        @Override
-        protected String getUsage() {
-            return "dlck [options]";
-        }
-    }
-
-    static class DeleteStreamACLCommand extends PerDLCommand {
-
-        String stream = null;
-
-        DeleteStreamACLCommand() {
-            super("delete_stream_acl", "Delete ACL for a given stream");
-            options.addOption("s", "stream", true, "Stream to set ACL");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
-            super.parseCommandLine(cmdline);
-            if (!cmdline.hasOption("s")) {
-                throw new ParseException("No stream to set ACL");
-            }
-            stream = cmdline.getOptionValue("s");
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(getZooKeeperClient(), getUri());
-            if (null == bkdlConfig.getACLRootPath()) {
-                // acl isn't enabled for this namespace.
-                System.err.println("ACL isn't enabled for namespace " + getUri());
-                return -1;
-            }
-            String zkPath = getUri() + "/" + bkdlConfig.getACLRootPath() + "/" + stream;
-            ZKAccessControl.delete(getZooKeeperClient(), zkPath);
-            return 0;
-        }
-
-        @Override
-        protected String getUsage() {
-            return null;
-        }
-    }
-
-    static class SetStreamACLCommand extends SetACLCommand {
-
-        String stream = null;
-
-        SetStreamACLCommand() {
-            super("set_stream_acl", "Set Default ACL for a given stream");
-            options.addOption("s", "stream", true, "Stream to set ACL");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
-            super.parseCommandLine(cmdline);
-            if (!cmdline.hasOption("s")) {
-                throw new ParseException("No stream to set ACL");
-            }
-            stream = cmdline.getOptionValue("s");
-        }
-
-        @Override
-        protected String getZKPath(String zkRootPath) {
-            return zkRootPath + "/" + stream;
-        }
-
-        @Override
-        protected String getUsage() {
-            return "set_stream_acl [options]";
-        }
-    }
-
-    static class SetDefaultACLCommand extends SetACLCommand {
-
-        SetDefaultACLCommand() {
-            super("set_default_acl", "Set Default ACL for a namespace");
-        }
-
-        @Override
-        protected String getZKPath(String zkRootPath) {
-            return zkRootPath;
-        }
-
-        @Override
-        protected String getUsage() {
-            return "set_default_acl [options]";
-        }
-    }
-
-    static abstract class SetACLCommand extends PerDLCommand {
-
-        boolean denyWrite = false;
-        boolean denyTruncate = false;
-        boolean denyDelete = false;
-        boolean denyAcquire = false;
-        boolean denyRelease = false;
-
-        protected SetACLCommand(String name, String description) {
-            super(name, description);
-            options.addOption("dw", "deny-write", false, "Deny write/bulkWrite requests");
-            options.addOption("dt", "deny-truncate", false, "Deny truncate requests");
-            options.addOption("dd", "deny-delete", false, "Deny delete requests");
-            options.addOption("da", "deny-acquire", false, "Deny acquire requests");
-            options.addOption("dr", "deny-release", false, "Deny release requests");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
-            super.parseCommandLine(cmdline);
-            denyWrite = cmdline.hasOption("dw");
-            denyTruncate = cmdline.hasOption("dt");
-            denyDelete = cmdline.hasOption("dd");
-            denyAcquire = cmdline.hasOption("da");
-            denyRelease = cmdline.hasOption("dr");
-        }
-
-        protected abstract String getZKPath(String zkRootPath);
-
-        protected ZKAccessControl getZKAccessControl(ZooKeeperClient zkc, String zkPath) throws Exception {
-            ZKAccessControl accessControl;
-            try {
-                accessControl = Await.result(ZKAccessControl.read(zkc, zkPath, null));
-            } catch (KeeperException.NoNodeException nne) {
-                accessControl = new ZKAccessControl(new AccessControlEntry(), zkPath);
-            }
-            return accessControl;
-        }
-
-        protected void setZKAccessControl(ZooKeeperClient zkc, ZKAccessControl accessControl) throws Exception {
-            String zkPath = accessControl.getZKPath();
-            if (null == zkc.get().exists(zkPath, false)) {
-                accessControl.create(zkc);
-            } else {
-                accessControl.update(zkc);
-            }
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(getZooKeeperClient(), getUri());
-            if (null == bkdlConfig.getACLRootPath()) {
-                // acl isn't enabled for this namespace.
-                System.err.println("ACL isn't enabled for namespace " + getUri());
-                return -1;
-            }
-            String zkPath = getZKPath(getUri().getPath() + "/" + bkdlConfig.getACLRootPath());
-            ZKAccessControl accessControl = getZKAccessControl(getZooKeeperClient(), zkPath);
-            AccessControlEntry acl = accessControl.getAccessControlEntry();
-            acl.setDenyWrite(denyWrite);
-            acl.setDenyTruncate(denyTruncate);
-            acl.setDenyDelete(denyDelete);
-            acl.setDenyAcquire(denyAcquire);
-            acl.setDenyRelease(denyRelease);
-            setZKAccessControl(getZooKeeperClient(), accessControl);
-            return 0;
-        }
-
-    }
-
-    public DistributedLogAdmin() {
-        super();
-        commands.clear();
-        addCommand(new HelpCommand());
-        addCommand(new BindCommand());
-        addCommand(new UnbindCommand());
-        addCommand(new RepairSeqNoCommand());
-        addCommand(new DLCKCommand());
-        addCommand(new SetDefaultACLCommand());
-        addCommand(new SetStreamACLCommand());
-        addCommand(new DeleteStreamACLCommand());
-    }
-
-    @Override
-    protected String getName() {
-        return "dlog_admin";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/package-info.java
deleted file mode 100644
index a7d6adb..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Admin Tools for DistributedLog
- */
-package com.twitter.distributedlog.admin;