You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:32 UTC
[27/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
deleted file mode 100644
index 74cd6cf..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
+++ /dev/null
@@ -1,402 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.util.FailpointUtils;
-import com.twitter.distributedlog.zk.ZKWatcherManager;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.RetryPolicy;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * ZooKeeper Client wrapper over {@link org.apache.bookkeeper.zookeeper.ZooKeeperClient}.
- * It handles retries on session expires and provides a watcher manager {@link ZKWatcherManager}.
- *
- * <h3>Metrics</h3>
- * <ul>
- * <li> zookeeper operation stats are exposed under scope <code>zk</code> by
- * {@link org.apache.bookkeeper.zookeeper.ZooKeeperClient}
- * <li> stats on zookeeper watched events are exposed under scope <code>watcher</code> by
- * {@link org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase}
- * <li> stats about {@link ZKWatcherManager} are exposed under scope <code>watcher_manager</code>
- * </ul>
- */
-public class ZooKeeperClient {
-
- public static interface Credentials {
-
- Credentials NONE = new Credentials() {
- @Override
- public void authenticate(ZooKeeper zooKeeper) {
- // noop
- }
- };
-
- void authenticate(ZooKeeper zooKeeper);
- }
-
- public static class DigestCredentials implements Credentials {
-
- String username;
- String password;
-
- public DigestCredentials(String username, String password) {
- this.username = username;
- this.password = password;
- }
-
- @Override
- public void authenticate(ZooKeeper zooKeeper) {
- zooKeeper.addAuthInfo("digest", String.format("%s:%s", username, password).getBytes(UTF_8));
- }
- }
-
- public interface ZooKeeperSessionExpireNotifier {
- void notifySessionExpired();
- }
-
- /**
- * Indicates an error connecting to a zookeeper cluster.
- */
- public static class ZooKeeperConnectionException extends IOException {
- private static final long serialVersionUID = 6682391687004819361L;
-
- public ZooKeeperConnectionException(String message) {
- super(message);
- }
-
- public ZooKeeperConnectionException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class.getName());
-
- private final String name;
- private final int sessionTimeoutMs;
- private final int defaultConnectionTimeoutMs;
- private final String zooKeeperServers;
- // GuardedBy "this", but still volatile for tests, where we want to be able to see writes
- // made from within long synchronized blocks.
- private volatile ZooKeeper zooKeeper = null;
- private final RetryPolicy retryPolicy;
- private final StatsLogger statsLogger;
- private final int retryThreadCount;
- private final double requestRateLimit;
- private final Credentials credentials;
- private volatile boolean authenticated = false;
- private Stopwatch disconnectedStopwatch = null;
-
- private boolean closed = false;
-
- final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>();
-
- // watcher manager to manage watchers
- private final ZKWatcherManager watcherManager;
-
- /**
- * Creates an unconnected client that will lazily attempt to connect on the first call to
- * {@link #get}. All successful connections will be authenticated with the given
- * {@code credentials}.
- *
- * @param sessionTimeoutMs
- * ZK session timeout in milliseconds
- * @param connectionTimeoutMs
- * ZK connection timeout in milliseconds
- * @param zooKeeperServers
- * the set of servers forming the ZK cluster
- */
- ZooKeeperClient(int sessionTimeoutMs, int connectionTimeoutMs, String zooKeeperServers) {
- this("default", sessionTimeoutMs, connectionTimeoutMs, zooKeeperServers, null, NullStatsLogger.INSTANCE, 1, 0,
- Credentials.NONE);
- }
-
- ZooKeeperClient(String name,
- int sessionTimeoutMs,
- int connectionTimeoutMs,
- String zooKeeperServers,
- RetryPolicy retryPolicy,
- StatsLogger statsLogger,
- int retryThreadCount,
- double requestRateLimit,
- Credentials credentials) {
- this.name = name;
- this.sessionTimeoutMs = sessionTimeoutMs;
- this.zooKeeperServers = zooKeeperServers;
- this.defaultConnectionTimeoutMs = connectionTimeoutMs;
- this.retryPolicy = retryPolicy;
- this.statsLogger = statsLogger;
- this.retryThreadCount = retryThreadCount;
- this.requestRateLimit = requestRateLimit;
- this.credentials = credentials;
- this.watcherManager = ZKWatcherManager.newBuilder()
- .name(name)
- .zkc(this)
- .statsLogger(statsLogger.scope("watcher_manager"))
- .build();
- }
-
- public List<ACL> getDefaultACL() {
- if (Credentials.NONE == credentials) {
- return ZooDefs.Ids.OPEN_ACL_UNSAFE;
- } else {
- return DistributedLogConstants.EVERYONE_READ_CREATOR_ALL;
- }
- }
-
- public ZKWatcherManager getWatcherManager() {
- return watcherManager;
- }
-
- /**
- * Returns the current active ZK connection or establishes a new one if none has yet been
- * established or a previous connection was disconnected or had its session time out.
- *
- * @return a connected ZooKeeper client
- * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
- * @throws InterruptedException if interrupted while waiting for a connection to be established
- * @throws TimeoutException if a connection could not be established within the configured
- * session timeout
- */
- public synchronized ZooKeeper get()
- throws ZooKeeperConnectionException, InterruptedException {
-
- try {
- FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_ZooKeeperConnectionLoss);
- } catch (IOException ioe) {
- throw new ZooKeeperConnectionException("Client " + name + " failed on establishing zookeeper connection", ioe);
- }
-
- // This indicates that the client was explictly closed
- if (closed) {
- throw new ZooKeeperConnectionException("Client " + name + " has already been closed");
- }
-
- // the underneath zookeeper is retryable zookeeper
- if (zooKeeper != null && retryPolicy != null) {
- if (zooKeeper.getState().equals(ZooKeeper.States.CONNECTED)) {
- // the zookeeper client is connected
- disconnectedStopwatch = null;
- } else {
- if (disconnectedStopwatch == null) {
- disconnectedStopwatch = Stopwatch.createStarted();
- } else {
- long disconnectedMs = disconnectedStopwatch.elapsed(TimeUnit.MILLISECONDS);
- if (disconnectedMs > defaultConnectionTimeoutMs) {
- closeInternal();
- authenticated = false;
- }
- }
- }
- }
-
- if (zooKeeper == null) {
- zooKeeper = buildZooKeeper();
- disconnectedStopwatch = null;
- }
-
- // In case authenticate throws an exception, the caller can try to recover the client by
- // calling get again.
- if (!authenticated) {
- credentials.authenticate(zooKeeper);
- authenticated = true;
- }
-
- return zooKeeper;
- }
-
- private ZooKeeper buildZooKeeper()
- throws ZooKeeperConnectionException, InterruptedException {
- Watcher watcher = new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- switch (event.getType()) {
- case None:
- switch (event.getState()) {
- case Expired:
- if (null == retryPolicy) {
- LOG.info("ZooKeeper {}' session expired. Event: {}", name, event);
- closeInternal();
- }
- authenticated = false;
- break;
- case Disconnected:
- if (null == retryPolicy) {
- LOG.info("ZooKeeper {} is disconnected from zookeeper now," +
- " but it is OK unless we received EXPIRED event.", name);
- }
- // Mark as not authenticated if expired or disconnected. In both cases
- // we lose any attached auth info. Relying on Expired/Disconnected is
- // sufficient since all Expired/Disconnected events are processed before
- // all SyncConnected events, and the underlying member is not updated until
- // SyncConnected is received.
- authenticated = false;
- break;
- default:
- break;
- }
- }
-
- try {
- for (Watcher watcher : watchers) {
- try {
- watcher.process(event);
- } catch (Throwable t) {
- LOG.warn("Encountered unexpected exception from watcher {} : ", watcher, t);
- }
- }
- } catch (Throwable t) {
- LOG.warn("Encountered unexpected exception when firing watched event {} : ", event, t);
- }
- }
- };
-
- Set<Watcher> watchers = new HashSet<Watcher>();
- watchers.add(watcher);
-
- ZooKeeper zk;
- try {
- RetryPolicy opRetryPolicy = null == retryPolicy ?
- new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0) : retryPolicy;
- RetryPolicy connectRetryPolicy = null == retryPolicy ?
- new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0) :
- new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE);
- zk = org.apache.bookkeeper.zookeeper.ZooKeeperClient.newBuilder()
- .connectString(zooKeeperServers)
- .sessionTimeoutMs(sessionTimeoutMs)
- .watchers(watchers)
- .operationRetryPolicy(opRetryPolicy)
- .connectRetryPolicy(connectRetryPolicy)
- .statsLogger(statsLogger)
- .retryThreadCount(retryThreadCount)
- .requestRateLimit(requestRateLimit)
- .build();
- } catch (KeeperException e) {
- throw new ZooKeeperConnectionException("Problem connecting to servers: " + zooKeeperServers, e);
- } catch (IOException e) {
- throw new ZooKeeperConnectionException("Problem connecting to servers: " + zooKeeperServers, e);
- }
- return zk;
- }
-
- /**
- * Clients that need to re-establish state after session expiration can register an
- * {@code onExpired} command to execute.
- *
- * @param onExpired the {@code Command} to register
- * @return the new {@link Watcher} which can later be passed to {@link #unregister} for
- * removal.
- */
- public Watcher registerExpirationHandler(final ZooKeeperSessionExpireNotifier onExpired) {
- Watcher watcher = new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) {
- try {
- onExpired.notifySessionExpired();
- } catch (Exception exc) {
- // do nothing
- }
- }
- }
- };
- register(watcher);
- return watcher;
- }
-
- /**
- * Clients that need to register a top-level {@code Watcher} should do so using this method. The
- * registered {@code watcher} will remain registered across re-connects and session expiration
- * events.
- *
- * @param watcher the {@code Watcher to register}
- */
- public void register(Watcher watcher) {
- if (null != watcher) {
- watchers.add(watcher);
- }
- }
-
- /**
- * Clients can attempt to unregister a top-level {@code Watcher} that has previously been
- * registered.
- *
- * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch
- * @return whether the given {@code Watcher} was found and removed from the active set
- */
- public boolean unregister(Watcher watcher) {
- return null != watcher && watchers.remove(watcher);
- }
-
- /**
- * Closes the current connection if any expiring the current ZooKeeper session. Any subsequent
- * calls to this method will no-op until the next successful {@link #get}.
- */
- public synchronized void closeInternal() {
- if (zooKeeper != null) {
- try {
- LOG.info("Closing zookeeper client {}.", name);
- zooKeeper.close();
- LOG.info("Closed zookeeper client {}.", name);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted trying to close zooKeeper {} : ", name, e);
- } finally {
- zooKeeper = null;
- }
- }
- }
-
- /**
- * Closes the the underlying zookeeper instance.
- * Subsequent attempts to {@link #get} will fail
- */
- public synchronized void close() {
- if (closed) {
- return;
- }
- LOG.info("Close zookeeper client {}.", name);
- closeInternal();
- // unregister gauges to prevent GC spiral
- this.watcherManager.unregisterGauges();
- closed = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java
deleted file mode 100644
index 15f1805..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Preconditions;
-import com.twitter.distributedlog.ZooKeeperClient.Credentials;
-import com.twitter.distributedlog.ZooKeeperClient.DigestCredentials;
-import com.twitter.distributedlog.impl.BKNamespaceDriver;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.zookeeper.RetryPolicy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-
-/**
- * Builder to build zookeeper client.
- */
-public class ZooKeeperClientBuilder {
-
- static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClientBuilder.class);
-
- /**
- * Create a zookeeper client builder to build zookeeper clients.
- *
- * @return zookeeper client builder.
- */
- public static ZooKeeperClientBuilder newBuilder() {
- return new ZooKeeperClientBuilder();
- }
-
- // name
- private String name = "default";
- // sessionTimeoutMs
- private int sessionTimeoutMs = -1;
- // conectionTimeoutMs
- private int conectionTimeoutMs = -1;
- // zkServers
- private String zkServers = null;
- // retry policy
- private RetryPolicy retryPolicy = null;
- // stats logger
- private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
- // retry executor thread count
- private int retryThreadCount = 1;
- // zookeeper access requestRateLimit limit
- private double requestRateLimit = 0;
- // Did call the zkAclId setter on the builder, used to ensure the setter is set.
- private boolean zkAclIdSet = false;
- private String zkAclId;
-
- // Cached ZooKeeper Client
- private ZooKeeperClient cachedClient = null;
-
- private ZooKeeperClientBuilder() {}
-
- /**
- * Set zookeeper client name
- *
- * @param name zookeeper client name
- * @return zookeeper client builder
- */
- public synchronized ZooKeeperClientBuilder name(String name) {
- this.name = name;
- return this;
- }
-
- /**
- * Set zookeeper session timeout in milliseconds.
- *
- * @param sessionTimeoutMs
- * session timeout in milliseconds.
- * @return zookeeper client builder.
- */
- public synchronized ZooKeeperClientBuilder sessionTimeoutMs(int sessionTimeoutMs) {
- this.sessionTimeoutMs = sessionTimeoutMs;
- if (this.conectionTimeoutMs <= 0) {
- this.conectionTimeoutMs = 2 * sessionTimeoutMs;
- }
- return this;
- }
-
- public synchronized ZooKeeperClientBuilder retryThreadCount(int retryThreadCount) {
- this.retryThreadCount = retryThreadCount;
- return this;
- }
-
- public synchronized ZooKeeperClientBuilder requestRateLimit(double requestRateLimit) {
- this.requestRateLimit = requestRateLimit;
- return this;
- }
-
- /**
- * Set zookeeper connection timeout in milliseconds
- *
- * @param connectionTimeoutMs
- * connection timeout ms.
- * @return builder
- */
- public synchronized ZooKeeperClientBuilder connectionTimeoutMs(int connectionTimeoutMs) {
- this.conectionTimeoutMs = connectionTimeoutMs;
- return this;
- }
-
- /**
- * Set ZooKeeper Connect String.
- *
- * @param zkServers
- * zookeeper servers to connect.
- * @return builder
- */
- public synchronized ZooKeeperClientBuilder zkServers(String zkServers) {
- this.zkServers = zkServers;
- return this;
- }
-
- /**
- * Set DistributedLog URI.
- *
- * @param uri
- * distributedlog uri.
- * @return builder.
- */
- public synchronized ZooKeeperClientBuilder uri(URI uri) {
- this.zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri);
- return this;
- }
-
- /**
- * Build zookeeper client using existing <i>zkc</i> client.
- *
- * @param zkc
- * zookeeper client.
- * @return builder
- */
- public synchronized ZooKeeperClientBuilder zkc(ZooKeeperClient zkc) {
- this.cachedClient = zkc;
- return this;
- }
-
- /**
- * Build zookeeper client with given retry policy <i>retryPolicy</i>.
- *
- * @param retryPolicy
- * retry policy
- * @return builder
- */
- public synchronized ZooKeeperClientBuilder retryPolicy(RetryPolicy retryPolicy) {
- this.retryPolicy = retryPolicy;
- return this;
- }
-
- /**
- * Build zookeeper client with given stats logger <i>statsLogger</i>.
- *
- * @param statsLogger
- * stats logger to expose zookeeper stats
- * @return builder
- */
- public synchronized ZooKeeperClientBuilder statsLogger(StatsLogger statsLogger) {
- this.statsLogger = statsLogger;
- return this;
- }
-
- /**
- * * Build zookeeper client with given zk acl digest id <i>zkAclId</i>.
- */
- public synchronized ZooKeeperClientBuilder zkAclId(String zkAclId) {
- this.zkAclIdSet = true;
- this.zkAclId = zkAclId;
- return this;
- }
-
- private void validateParameters() {
- Preconditions.checkNotNull(zkServers, "No zk servers provided.");
- Preconditions.checkArgument(conectionTimeoutMs > 0,
- "Invalid connection timeout : %d", conectionTimeoutMs);
- Preconditions.checkArgument(sessionTimeoutMs > 0,
- "Invalid session timeout : %d", sessionTimeoutMs);
- Preconditions.checkNotNull(statsLogger, "No stats logger provided.");
- Preconditions.checkArgument(zkAclIdSet, "Zookeeper acl id not set.");
- }
-
- /**
- * Build a zookeeper client.
- *
- * @return zookeeper client.
- */
- public synchronized ZooKeeperClient build() {
- if (null == cachedClient) {
- cachedClient = buildClient();
- }
- return cachedClient;
- }
-
- private ZooKeeperClient buildClient() {
- validateParameters();
-
- Credentials credentials = Credentials.NONE;
- if (null != zkAclId) {
- credentials = new DigestCredentials(zkAclId, zkAclId);
- }
-
- return new ZooKeeperClient(
- name,
- sessionTimeoutMs,
- conectionTimeoutMs,
- zkServers,
- retryPolicy,
- statsLogger,
- retryThreadCount,
- requestRateLimit,
- credentials
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/AccessControlManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/AccessControlManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/AccessControlManager.java
deleted file mode 100644
index 5fcc87e..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/AccessControlManager.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.acl;
-
-/**
- * Access Control on stream operations
- */
-public interface AccessControlManager {
-
- /**
- * Whether allowing writing to a stream.
- *
- * @param stream
- * Stream to write
- * @return true if allowing writing to the given stream, otherwise false.
- */
- boolean allowWrite(String stream);
-
- /**
- * Whether allowing truncating a given stream.
- *
- * @param stream
- * Stream to truncate
- * @return true if allowing truncating a given stream.
- */
- boolean allowTruncate(String stream);
-
- /**
- * Whether allowing deleting a given stream.
- *
- * @param stream
- * Stream to delete
- * @return true if allowing deleting a given stream.
- */
- boolean allowDelete(String stream);
-
- /**
- * Whether allowing proxies to acquire a given stream.
- *
- * @param stream
- * stream to acquire
- * @return true if allowing proxies to acquire the given stream.
- */
- boolean allowAcquire(String stream);
-
- /**
- * Whether allowing proxies to release ownership for a given stream.
- *
- * @param stream
- * stream to release
- * @return true if allowing proxies to release a given stream.
- */
- boolean allowRelease(String stream);
-
- /**
- * Close the access control manager.
- */
- void close();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/DefaultAccessControlManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/DefaultAccessControlManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/DefaultAccessControlManager.java
deleted file mode 100644
index e757595..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/DefaultAccessControlManager.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.acl;
-
-public class DefaultAccessControlManager implements AccessControlManager {
-
- public static final DefaultAccessControlManager INSTANCE = new DefaultAccessControlManager();
-
- private DefaultAccessControlManager() {
- }
-
- @Override
- public boolean allowWrite(String stream) {
- return true;
- }
-
- @Override
- public boolean allowTruncate(String stream) {
- return true;
- }
-
- @Override
- public boolean allowDelete(String stream) {
- return true;
- }
-
- @Override
- public boolean allowAcquire(String stream) {
- return true;
- }
-
- @Override
- public boolean allowRelease(String stream) {
- return true;
- }
-
- @Override
- public void close() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/package-info.java
deleted file mode 100644
index 65109fc..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Access Control for distributedlog streams.
- */
-package com.twitter.distributedlog.acl;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
deleted file mode 100644
index 0512907..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
+++ /dev/null
@@ -1,921 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.admin;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.LogRecordWithDLSN;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.ReadUtils;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.ZooKeeperClientBuilder;
-import com.twitter.distributedlog.impl.BKNamespaceDriver;
-import com.twitter.distributedlog.impl.acl.ZKAccessControl;
-import com.twitter.distributedlog.exceptions.DLIllegalStateException;
-import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
-import com.twitter.distributedlog.impl.metadata.BKDLConfig;
-import com.twitter.distributedlog.metadata.DLMetadata;
-import com.twitter.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
-import com.twitter.distributedlog.metadata.MetadataUpdater;
-import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.namespace.NamespaceDriver;
-import com.twitter.distributedlog.thrift.AccessControlEntry;
-import com.twitter.distributedlog.tools.DistributedLogTool;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.SchedulerUtils;
-import com.twitter.util.Await;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.util.IOUtils;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Admin Tool for DistributedLog.
- */
-public class DistributedLogAdmin extends DistributedLogTool {
-
- static final Logger LOG = LoggerFactory.getLogger(DistributedLogAdmin.class);
-
- /**
- * Fix inprogress segment with lower ledger sequence number.
- *
- * @param namespace
- * dl namespace
- * @param metadataUpdater
- * metadata updater.
- * @param streamName
- * stream name.
- * @param verbose
- * print verbose messages.
- * @param interactive
- * is confirmation needed before executing actual action.
- * @throws IOException
- */
- public static void fixInprogressSegmentWithLowerSequenceNumber(final DistributedLogNamespace namespace,
- final MetadataUpdater metadataUpdater,
- final String streamName,
- final boolean verbose,
- final boolean interactive) throws IOException {
- DistributedLogManager dlm = namespace.openLog(streamName);
- try {
- List<LogSegmentMetadata> segments = dlm.getLogSegments();
- if (verbose) {
- System.out.println("LogSegments for " + streamName + " : ");
- for (LogSegmentMetadata segment : segments) {
- System.out.println(segment.getLogSegmentSequenceNumber() + "\t: " + segment);
- }
- }
- LOG.info("Get log segments for {} : {}", streamName, segments);
- // validate log segments
- long maxCompletedLogSegmentSequenceNumber = -1L;
- LogSegmentMetadata inprogressSegment = null;
- for (LogSegmentMetadata segment : segments) {
- if (!segment.isInProgress()) {
- maxCompletedLogSegmentSequenceNumber = Math.max(maxCompletedLogSegmentSequenceNumber, segment.getLogSegmentSequenceNumber());
- } else {
- // we already found an inprogress segment
- if (null != inprogressSegment) {
- throw new DLIllegalStateException("Multiple inprogress segments found for stream " + streamName + " : " + segments);
- }
- inprogressSegment = segment;
- }
- }
- if (null == inprogressSegment || inprogressSegment.getLogSegmentSequenceNumber() > maxCompletedLogSegmentSequenceNumber) {
- // nothing to fix
- return;
- }
- final long newLogSegmentSequenceNumber = maxCompletedLogSegmentSequenceNumber + 1;
- if (interactive && !IOUtils.confirmPrompt("Confirm to fix (Y/N), Ctrl+C to break : ")) {
- return;
- }
- final LogSegmentMetadata newSegment =
- FutureUtils.result(metadataUpdater.changeSequenceNumber(inprogressSegment, newLogSegmentSequenceNumber));
- LOG.info("Fixed {} : {} -> {} ",
- new Object[] { streamName, inprogressSegment, newSegment });
- if (verbose) {
- System.out.println("Fixed " + streamName + " : " + inprogressSegment.getZNodeName()
- + " -> " + newSegment.getZNodeName());
- System.out.println("\t old: " + inprogressSegment);
- System.out.println("\t new: " + newSegment);
- System.out.println();
- }
- } finally {
- dlm.close();
- }
- }
-
- private static class LogSegmentCandidate {
- final LogSegmentMetadata metadata;
- final LogRecordWithDLSN lastRecord;
-
- LogSegmentCandidate(LogSegmentMetadata metadata, LogRecordWithDLSN lastRecord) {
- this.metadata = metadata;
- this.lastRecord = lastRecord;
- }
-
- @Override
- public String toString() {
- return "LogSegmentCandidate[ metadata = " + metadata + ", last record = " + lastRecord + " ]";
- }
-
- }
-
- private static final Comparator<LogSegmentCandidate> LOG_SEGMENT_CANDIDATE_COMPARATOR =
- new Comparator<LogSegmentCandidate>() {
- @Override
- public int compare(LogSegmentCandidate o1, LogSegmentCandidate o2) {
- return LogSegmentMetadata.COMPARATOR.compare(o1.metadata, o2.metadata);
- }
- };
-
- private static class StreamCandidate {
-
- final String streamName;
- final SortedSet<LogSegmentCandidate> segmentCandidates =
- new TreeSet<LogSegmentCandidate>(LOG_SEGMENT_CANDIDATE_COMPARATOR);
-
- StreamCandidate(String streamName) {
- this.streamName = streamName;
- }
-
- synchronized void addLogSegmentCandidate(LogSegmentCandidate segmentCandidate) {
- segmentCandidates.add(segmentCandidate);
- }
-
- @Override
- public String toString() {
- return "StreamCandidate[ name = " + streamName + ", segments = " + segmentCandidates + " ]";
- }
- }
-
- public static void checkAndRepairDLNamespace(final URI uri,
- final DistributedLogNamespace namespace,
- final MetadataUpdater metadataUpdater,
- final OrderedScheduler scheduler,
- final boolean verbose,
- final boolean interactive) throws IOException {
- checkAndRepairDLNamespace(uri, namespace, metadataUpdater, scheduler, verbose, interactive, 1);
- }
-
- public static void checkAndRepairDLNamespace(final URI uri,
- final DistributedLogNamespace namespace,
- final MetadataUpdater metadataUpdater,
- final OrderedScheduler scheduler,
- final boolean verbose,
- final boolean interactive,
- final int concurrency) throws IOException {
- Preconditions.checkArgument(concurrency > 0, "Invalid concurrency " + concurrency + " found.");
- // 0. getting streams under a given uri.
- Iterator<String> streamsIter = namespace.getLogs();
- List<String> streams = Lists.newArrayList();
- while (streamsIter.hasNext()) {
- streams.add(streamsIter.next());
- }
- if (verbose) {
- System.out.println("- 0. checking streams under " + uri);
- }
- if (streams.size() == 0) {
- System.out.println("+ 0. nothing to check. quit.");
- return;
- }
- Map<String, StreamCandidate> streamCandidates =
- checkStreams(namespace, streams, scheduler, concurrency);
- if (verbose) {
- System.out.println("+ 0. " + streamCandidates.size() + " corrupted streams found.");
- }
- if (interactive && !IOUtils.confirmPrompt("Do you want to fix all " + streamCandidates.size() + " corrupted streams (Y/N) : ")) {
- return;
- }
- if (verbose) {
- System.out.println("- 1. repairing " + streamCandidates.size() + " corrupted streams.");
- }
- for (StreamCandidate candidate : streamCandidates.values()) {
- if (!repairStream(metadataUpdater, candidate, verbose, interactive)) {
- if (verbose) {
- System.out.println("* 1. aborted repairing corrupted streams.");
- }
- return;
- }
- }
- if (verbose) {
- System.out.println("+ 1. repaired " + streamCandidates.size() + " corrupted streams.");
- }
- }
-
- private static Map<String, StreamCandidate> checkStreams(
- final DistributedLogNamespace namespace,
- final Collection<String> streams,
- final OrderedScheduler scheduler,
- final int concurrency) throws IOException {
- final LinkedBlockingQueue<String> streamQueue =
- new LinkedBlockingQueue<String>();
- streamQueue.addAll(streams);
- final Map<String, StreamCandidate> candidateMap =
- new ConcurrentSkipListMap<String, StreamCandidate>();
- final AtomicInteger numPendingStreams = new AtomicInteger(streams.size());
- final CountDownLatch doneLatch = new CountDownLatch(1);
- Runnable checkRunnable = new Runnable() {
- @Override
- public void run() {
- while (!streamQueue.isEmpty()) {
- String stream;
- try {
- stream = streamQueue.take();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
- }
- StreamCandidate candidate;
- try {
- LOG.info("Checking stream {}.", stream);
- candidate = checkStream(namespace, stream, scheduler);
- LOG.info("Checked stream {} - {}.", stream, candidate);
- } catch (IOException e) {
- LOG.error("Error on checking stream {} : ", stream, e);
- doneLatch.countDown();
- break;
- }
- if (null != candidate) {
- candidateMap.put(stream, candidate);
- }
- if (numPendingStreams.decrementAndGet() == 0) {
- doneLatch.countDown();
- }
- }
- }
- };
- Thread[] threads = new Thread[concurrency];
- for (int i = 0; i < concurrency; i++) {
- threads[i] = new Thread(checkRunnable, "check-thread-" + i);
- threads[i].start();
- }
- try {
- doneLatch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- if (numPendingStreams.get() != 0) {
- throw new IOException(numPendingStreams.get() + " streams left w/o checked");
- }
- for (int i = 0; i < concurrency; i++) {
- threads[i].interrupt();
- try {
- threads[i].join();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- return candidateMap;
- }
-
- private static StreamCandidate checkStream(
- final DistributedLogNamespace namespace,
- final String streamName,
- final OrderedScheduler scheduler) throws IOException {
- DistributedLogManager dlm = namespace.openLog(streamName);
- try {
- List<LogSegmentMetadata> segments = dlm.getLogSegments();
- if (segments.isEmpty()) {
- return null;
- }
- List<Future<LogSegmentCandidate>> futures =
- new ArrayList<Future<LogSegmentCandidate>>(segments.size());
- for (LogSegmentMetadata segment : segments) {
- futures.add(checkLogSegment(namespace, streamName, segment, scheduler));
- }
- List<LogSegmentCandidate> segmentCandidates;
- try {
- segmentCandidates = Await.result(Future.collect(futures));
- } catch (Exception e) {
- throw new IOException("Failed on checking stream " + streamName, e);
- }
- StreamCandidate streamCandidate = new StreamCandidate(streamName);
- for (LogSegmentCandidate segmentCandidate: segmentCandidates) {
- if (null != segmentCandidate) {
- streamCandidate.addLogSegmentCandidate(segmentCandidate);
- }
- }
- if (streamCandidate.segmentCandidates.isEmpty()) {
- return null;
- }
- return streamCandidate;
- } finally {
- dlm.close();
- }
- }
-
- private static Future<LogSegmentCandidate> checkLogSegment(
- final DistributedLogNamespace namespace,
- final String streamName,
- final LogSegmentMetadata metadata,
- final OrderedScheduler scheduler) {
- if (metadata.isInProgress()) {
- return Future.value(null);
- }
-
- final LogSegmentEntryStore entryStore = namespace.getNamespaceDriver()
- .getLogSegmentEntryStore(NamespaceDriver.Role.READER);
- return ReadUtils.asyncReadLastRecord(
- streamName,
- metadata,
- true,
- false,
- true,
- 4,
- 16,
- new AtomicInteger(0),
- scheduler,
- entryStore
- ).map(new Function<LogRecordWithDLSN, LogSegmentCandidate>() {
- @Override
- public LogSegmentCandidate apply(LogRecordWithDLSN record) {
- if (null != record &&
- (record.getDlsn().compareTo(metadata.getLastDLSN()) > 0 ||
- record.getTransactionId() > metadata.getLastTxId() ||
- !metadata.isRecordPositionWithinSegmentScope(record))) {
- return new LogSegmentCandidate(metadata, record);
- } else {
- return null;
- }
- }
- });
- }
-
- private static boolean repairStream(MetadataUpdater metadataUpdater,
- StreamCandidate streamCandidate,
- boolean verbose,
- boolean interactive) throws IOException {
- if (verbose) {
- System.out.println("Stream " + streamCandidate.streamName + " : ");
- for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) {
- System.out.println(" " + segmentCandidate.metadata.getLogSegmentSequenceNumber()
- + " : metadata = " + segmentCandidate.metadata + ", last dlsn = "
- + segmentCandidate.lastRecord.getDlsn());
- }
- System.out.println("-------------------------------------------");
- }
- if (interactive && !IOUtils.confirmPrompt("Do you want to fix the stream " + streamCandidate.streamName + " (Y/N) : ")) {
- return false;
- }
- for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) {
- LogSegmentMetadata newMetadata = FutureUtils.result(
- metadataUpdater.updateLastRecord(segmentCandidate.metadata, segmentCandidate.lastRecord));
- if (verbose) {
- System.out.println(" Fixed segment " + segmentCandidate.metadata.getLogSegmentSequenceNumber() + " : ");
- System.out.println(" old metadata : " + segmentCandidate.metadata);
- System.out.println(" new metadata : " + newMetadata);
- }
- }
- if (verbose) {
- System.out.println("-------------------------------------------");
- }
- return true;
- }
-
- //
- // Commands
- //
-
- /**
- * Unbind the bookkeeper environment for a given distributedlog uri.
- *
- * TODO: move unbind operation to namespace driver
- */
- class UnbindCommand extends OptsCommand {
-
- Options options = new Options();
-
- UnbindCommand() {
- super("unbind", "unbind the bookkeeper environment bound for a given distributedlog instance.");
- options.addOption("f", "force", false, "Force unbinding without prompt.");
- }
-
- @Override
- protected Options getOptions() {
- return options;
- }
-
- @Override
- protected String getUsage() {
- return "unbind [options] <distributedlog uri>";
- }
-
- @Override
- protected int runCmd(CommandLine cmdline) throws Exception {
- String[] args = cmdline.getArgs();
- if (args.length <= 0) {
- System.err.println("No distributedlog uri specified.");
- printUsage();
- return -1;
- }
- boolean force = cmdline.hasOption("f");
- URI uri = URI.create(args[0]);
- // resolving the uri to see if there is another bindings in this uri.
- ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri)
- .sessionTimeoutMs(10000).build();
- BKDLConfig bkdlConfig;
- try {
- bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
- } catch (IOException ie) {
- bkdlConfig = null;
- }
- if (null == bkdlConfig) {
- System.out.println("No bookkeeper is bound to " + uri);
- return 0;
- } else {
- System.out.println("There is bookkeeper bound to " + uri + " : ");
- System.out.println("");
- System.out.println(bkdlConfig.toString());
- System.out.println("");
- if (!force && !IOUtils.confirmPrompt("Do you want to unbind " + uri + " :\n")) {
- return 0;
- }
- }
- DLMetadata.unbind(uri);
- System.out.println("Unbound on " + uri + ".");
- return 0;
- }
- }
-
- /**
- * Bind Command to bind bookkeeper environment for a given distributed uri.
- *
- * TODO: move bind to namespace driver
- */
- class BindCommand extends OptsCommand {
-
- Options options = new Options();
-
- BindCommand() {
- super("bind", "bind the bookkeeper environment settings for a given distributedlog instance.");
- options.addOption("l", "bkLedgers", true, "ZooKeeper ledgers path for bookkeeper instance.");
- options.addOption("s", "bkZkServers", true, "ZooKeeper servers used for bookkeeper for writers.");
- options.addOption("bkzr", "bkZkServersForReader", true, "ZooKeeper servers used for bookkeeper for readers.");
- options.addOption("dlzw", "dlZkServersForWriter", true, "ZooKeeper servers used for distributedlog for writers.");
- options.addOption("dlzr", "dlZkServersForReader", true, "ZooKeeper servers used for distributedlog for readers.");
- options.addOption("i", "sanityCheckTxnID", true, "Flag to sanity check highest txn id.");
- options.addOption("r", "encodeRegionID", true, "Flag to encode region id.");
- options.addOption("seqno", "firstLogSegmentSeqNo", true, "The first log segment sequence number to use after upgrade");
- options.addOption("fns", "federatedNamespace", false, "Flag to turn a namespace to federated namespace");
- options.addOption("f", "force", false, "Force binding without prompt.");
- options.addOption("c", "creation", false, "Whether is it a creation binding.");
- options.addOption("q", "query", false, "Query the bookkeeper bindings");
- }
-
- @Override
- protected Options getOptions() {
- return options;
- }
-
- @Override
- protected String getUsage() {
- return "bind [options] <distributedlog uri>";
- }
-
- @Override
- protected int runCmd(CommandLine cmdline) throws Exception {
- boolean isQuery = cmdline.hasOption("q");
- if (!isQuery && (!cmdline.hasOption("l") || !cmdline.hasOption("s"))) {
- System.err.println("Error: Neither zkServers nor ledgersPath specified for bookkeeper environment.");
- printUsage();
- return -1;
- }
- String[] args = cmdline.getArgs();
- if (args.length <= 0) {
- System.err.println("No distributedlog uri specified.");
- printUsage();
- return -1;
- }
- boolean force = cmdline.hasOption("f");
- boolean creation = cmdline.hasOption("c");
- String bkLedgersPath = cmdline.getOptionValue("l");
- String bkZkServersForWriter = cmdline.getOptionValue("s");
- boolean sanityCheckTxnID =
- !cmdline.hasOption("i") || Boolean.parseBoolean(cmdline.getOptionValue("i"));
- boolean encodeRegionID =
- cmdline.hasOption("r") && Boolean.parseBoolean(cmdline.getOptionValue("r"));
-
- String bkZkServersForReader;
- if (cmdline.hasOption("bkzr")) {
- bkZkServersForReader = cmdline.getOptionValue("bkzr");
- } else {
- bkZkServersForReader = bkZkServersForWriter;
- }
-
- URI uri = URI.create(args[0]);
-
- String dlZkServersForWriter;
- String dlZkServersForReader;
- if (cmdline.hasOption("dlzw")) {
- dlZkServersForWriter = cmdline.getOptionValue("dlzw");
- } else {
- dlZkServersForWriter = BKNamespaceDriver.getZKServersFromDLUri(uri);
- }
- if (cmdline.hasOption("dlzr")) {
- dlZkServersForReader = cmdline.getOptionValue("dlzr");
- } else {
- dlZkServersForReader = dlZkServersForWriter;
- }
-
- // resolving the uri to see if there is another bindings in this uri.
- ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri).zkAclId(null)
- .sessionTimeoutMs(10000).build();
- try {
- BKDLConfig newBKDLConfig =
- new BKDLConfig(dlZkServersForWriter, dlZkServersForReader,
- bkZkServersForWriter, bkZkServersForReader, bkLedgersPath)
- .setSanityCheckTxnID(sanityCheckTxnID)
- .setEncodeRegionID(encodeRegionID);
-
- if (cmdline.hasOption("seqno")) {
- newBKDLConfig = newBKDLConfig.setFirstLogSegmentSeqNo(Long.parseLong(cmdline.getOptionValue("seqno")));
- }
-
- if (cmdline.hasOption("fns")) {
- newBKDLConfig = newBKDLConfig.setFederatedNamespace(true);
- }
-
- BKDLConfig bkdlConfig;
- try {
- bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
- } catch (IOException ie) {
- bkdlConfig = null;
- }
- if (null == bkdlConfig) {
- System.out.println("No bookkeeper is bound to " + uri);
- } else {
- System.out.println("There is bookkeeper bound to " + uri + " : ");
- System.out.println("");
- System.out.println(bkdlConfig.toString());
- System.out.println("");
- if (!isQuery) {
- if (newBKDLConfig.equals(bkdlConfig)) {
- System.out.println("No bookkeeper binding needs to be updated. Quit.");
- return 0;
- } else if(!newBKDLConfig.isFederatedNamespace() && bkdlConfig.isFederatedNamespace()) {
- System.out.println("You can't turn a federated namespace back to non-federated.");
- return 0;
- } else {
- if (!force && !IOUtils.confirmPrompt("Do you want to bind " + uri
- + " with new bookkeeper instance :\n" + newBKDLConfig)) {
- return 0;
- }
- }
- }
- }
- if (isQuery) {
- System.out.println("Done.");
- return 0;
- }
- DLMetadata dlMetadata = DLMetadata.create(newBKDLConfig);
- if (creation) {
- try {
- dlMetadata.create(uri);
- System.out.println("Created binding on " + uri + ".");
- } catch (IOException ie) {
- System.err.println("Failed to create binding on " + uri + " : " + ie.getMessage());
- }
- } else {
- try {
- dlMetadata.update(uri);
- System.out.println("Updated binding on " + uri + " : ");
- System.out.println("");
- System.out.println(newBKDLConfig.toString());
- System.out.println("");
- } catch (IOException ie) {
- System.err.println("Failed to update binding on " + uri + " : " + ie.getMessage());
- }
- }
- if (newBKDLConfig.isFederatedNamespace()) {
- try {
- FederatedZKLogMetadataStore.createFederatedNamespace(uri, zkc);
- } catch (KeeperException.NodeExistsException nee) {
- // ignore node exists exception
- }
- }
- return 0;
- } finally {
- zkc.close();
- }
- }
- }
-
- static class RepairSeqNoCommand extends PerDLCommand {
-
- boolean dryrun = false;
- boolean verbose = false;
- final List<String> streams = new ArrayList<String>();
-
- RepairSeqNoCommand() {
- super("repairseqno", "Repair a stream whose inprogress log segment has lower sequence number.");
- options.addOption("d", "dryrun", false, "Dry run without repairing");
- options.addOption("l", "list", true, "List of streams to repair, separated by comma");
- options.addOption("v", "verbose", false, "Print verbose messages");
- }
-
- @Override
- protected void parseCommandLine(CommandLine cmdline) throws ParseException {
- super.parseCommandLine(cmdline);
- dryrun = cmdline.hasOption("d");
- verbose = cmdline.hasOption("v");
- force = !dryrun && cmdline.hasOption("f");
- if (!cmdline.hasOption("l")) {
- throw new ParseException("No streams provided to repair");
- }
- String streamsList = cmdline.getOptionValue("l");
- Collections.addAll(streams, streamsList.split(","));
- }
-
- @Override
- protected int runCmd() throws Exception {
- MetadataUpdater metadataUpdater = dryrun ?
- new DryrunLogSegmentMetadataStoreUpdater(getConf(),
- getLogSegmentMetadataStore()) :
- LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(),
- getLogSegmentMetadataStore());
- System.out.println("List of streams : ");
- System.out.println(streams);
- if (!IOUtils.confirmPrompt("Do you want to repair all these streams (Y/N):")) {
- return -1;
- }
- for (String stream : streams) {
- fixInprogressSegmentWithLowerSequenceNumber(getNamespace(), metadataUpdater, stream, verbose, !getForce());
- }
- return 0;
- }
-
- @Override
- protected String getUsage() {
- return "repairseqno [options]";
- }
- }
-
- static class DLCKCommand extends PerDLCommand {
-
- boolean dryrun = false;
- boolean verbose = false;
- int concurrency = 1;
-
- DLCKCommand() {
- super("dlck", "Check and repair a distributedlog namespace");
- options.addOption("d", "dryrun", false, "Dry run without repairing");
- options.addOption("v", "verbose", false, "Print verbose messages");
- options.addOption("cy", "concurrency", true, "Concurrency on checking streams");
- }
-
- @Override
- protected void parseCommandLine(CommandLine cmdline) throws ParseException {
- super.parseCommandLine(cmdline);
- dryrun = cmdline.hasOption("d");
- verbose = cmdline.hasOption("v");
- if (cmdline.hasOption("cy")) {
- try {
- concurrency = Integer.parseInt(cmdline.getOptionValue("cy"));
- } catch (NumberFormatException nfe) {
- throw new ParseException("Invalid concurrency value : " + cmdline.getOptionValue("cy"));
- }
- }
- }
-
- @Override
- protected int runCmd() throws Exception {
- MetadataUpdater metadataUpdater = dryrun ?
- new DryrunLogSegmentMetadataStoreUpdater(getConf(),
- getLogSegmentMetadataStore()) :
- LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(),
- getLogSegmentMetadataStore());
- OrderedScheduler scheduler = OrderedScheduler.newBuilder()
- .name("dlck-scheduler")
- .corePoolSize(Runtime.getRuntime().availableProcessors())
- .build();
- ExecutorService executorService = Executors.newCachedThreadPool();
- try {
- checkAndRepairDLNamespace(getUri(), getNamespace(), metadataUpdater, scheduler,
- verbose, !getForce(), concurrency);
- } finally {
- SchedulerUtils.shutdownScheduler(executorService, 5, TimeUnit.MINUTES);
- }
- return 0;
- }
-
- @Override
- protected String getUsage() {
- return "dlck [options]";
- }
- }
-
- static class DeleteStreamACLCommand extends PerDLCommand {
-
- String stream = null;
-
- DeleteStreamACLCommand() {
- super("delete_stream_acl", "Delete ACL for a given stream");
- options.addOption("s", "stream", true, "Stream to set ACL");
- }
-
- @Override
- protected void parseCommandLine(CommandLine cmdline) throws ParseException {
- super.parseCommandLine(cmdline);
- if (!cmdline.hasOption("s")) {
- throw new ParseException("No stream to set ACL");
- }
- stream = cmdline.getOptionValue("s");
- }
-
- @Override
- protected int runCmd() throws Exception {
- BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(getZooKeeperClient(), getUri());
- if (null == bkdlConfig.getACLRootPath()) {
- // acl isn't enabled for this namespace.
- System.err.println("ACL isn't enabled for namespace " + getUri());
- return -1;
- }
- String zkPath = getUri() + "/" + bkdlConfig.getACLRootPath() + "/" + stream;
- ZKAccessControl.delete(getZooKeeperClient(), zkPath);
- return 0;
- }
-
- @Override
- protected String getUsage() {
- return null;
- }
- }
-
- static class SetStreamACLCommand extends SetACLCommand {
-
- String stream = null;
-
- SetStreamACLCommand() {
- super("set_stream_acl", "Set Default ACL for a given stream");
- options.addOption("s", "stream", true, "Stream to set ACL");
- }
-
- @Override
- protected void parseCommandLine(CommandLine cmdline) throws ParseException {
- super.parseCommandLine(cmdline);
- if (!cmdline.hasOption("s")) {
- throw new ParseException("No stream to set ACL");
- }
- stream = cmdline.getOptionValue("s");
- }
-
- @Override
- protected String getZKPath(String zkRootPath) {
- return zkRootPath + "/" + stream;
- }
-
- @Override
- protected String getUsage() {
- return "set_stream_acl [options]";
- }
- }
-
- static class SetDefaultACLCommand extends SetACLCommand {
-
- SetDefaultACLCommand() {
- super("set_default_acl", "Set Default ACL for a namespace");
- }
-
- @Override
- protected String getZKPath(String zkRootPath) {
- return zkRootPath;
- }
-
- @Override
- protected String getUsage() {
- return "set_default_acl [options]";
- }
- }
-
- static abstract class SetACLCommand extends PerDLCommand {
-
- boolean denyWrite = false;
- boolean denyTruncate = false;
- boolean denyDelete = false;
- boolean denyAcquire = false;
- boolean denyRelease = false;
-
- protected SetACLCommand(String name, String description) {
- super(name, description);
- options.addOption("dw", "deny-write", false, "Deny write/bulkWrite requests");
- options.addOption("dt", "deny-truncate", false, "Deny truncate requests");
- options.addOption("dd", "deny-delete", false, "Deny delete requests");
- options.addOption("da", "deny-acquire", false, "Deny acquire requests");
- options.addOption("dr", "deny-release", false, "Deny release requests");
- }
-
- @Override
- protected void parseCommandLine(CommandLine cmdline) throws ParseException {
- super.parseCommandLine(cmdline);
- denyWrite = cmdline.hasOption("dw");
- denyTruncate = cmdline.hasOption("dt");
- denyDelete = cmdline.hasOption("dd");
- denyAcquire = cmdline.hasOption("da");
- denyRelease = cmdline.hasOption("dr");
- }
-
- protected abstract String getZKPath(String zkRootPath);
-
- protected ZKAccessControl getZKAccessControl(ZooKeeperClient zkc, String zkPath) throws Exception {
- ZKAccessControl accessControl;
- try {
- accessControl = Await.result(ZKAccessControl.read(zkc, zkPath, null));
- } catch (KeeperException.NoNodeException nne) {
- accessControl = new ZKAccessControl(new AccessControlEntry(), zkPath);
- }
- return accessControl;
- }
-
- protected void setZKAccessControl(ZooKeeperClient zkc, ZKAccessControl accessControl) throws Exception {
- String zkPath = accessControl.getZKPath();
- if (null == zkc.get().exists(zkPath, false)) {
- accessControl.create(zkc);
- } else {
- accessControl.update(zkc);
- }
- }
-
- @Override
- protected int runCmd() throws Exception {
- BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(getZooKeeperClient(), getUri());
- if (null == bkdlConfig.getACLRootPath()) {
- // acl isn't enabled for this namespace.
- System.err.println("ACL isn't enabled for namespace " + getUri());
- return -1;
- }
- String zkPath = getZKPath(getUri().getPath() + "/" + bkdlConfig.getACLRootPath());
- ZKAccessControl accessControl = getZKAccessControl(getZooKeeperClient(), zkPath);
- AccessControlEntry acl = accessControl.getAccessControlEntry();
- acl.setDenyWrite(denyWrite);
- acl.setDenyTruncate(denyTruncate);
- acl.setDenyDelete(denyDelete);
- acl.setDenyAcquire(denyAcquire);
- acl.setDenyRelease(denyRelease);
- setZKAccessControl(getZooKeeperClient(), accessControl);
- return 0;
- }
-
- }
-
- public DistributedLogAdmin() {
- super();
- commands.clear();
- addCommand(new HelpCommand());
- addCommand(new BindCommand());
- addCommand(new UnbindCommand());
- addCommand(new RepairSeqNoCommand());
- addCommand(new DLCKCommand());
- addCommand(new SetDefaultACLCommand());
- addCommand(new SetStreamACLCommand());
- addCommand(new DeleteStreamACLCommand());
- }
-
- @Override
- protected String getName() {
- return "dlog_admin";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/package-info.java
deleted file mode 100644
index a7d6adb..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Admin Tools for DistributedLog
- */
-package com.twitter.distributedlog.admin;