You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:41 UTC
[27/31] incubator-distributedlog git commit: DL-163: clean up direct
zookeeper and bookkeeper usage and use metadata/data store abstraction
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControl.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControl.java
deleted file mode 100644
index bf16256..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControl.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.acl;
-
-import com.google.common.base.Objects;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.thrift.AccessControlEntry;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.apache.thrift.transport.TMemoryBuffer;
-import org.apache.thrift.transport.TMemoryInputTransport;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-public class ZKAccessControl {
-
- private static final int BUFFER_SIZE = 4096;
-
- public static final AccessControlEntry DEFAULT_ACCESS_CONTROL_ENTRY = new AccessControlEntry();
-
- public static class CorruptedAccessControlException extends IOException {
-
- private static final long serialVersionUID = 5391285182476211603L;
-
- public CorruptedAccessControlException(String zkPath, Throwable t) {
- super("Access Control @ " + zkPath + " is corrupted.", t);
- }
- }
-
- protected final AccessControlEntry accessControlEntry;
- protected final String zkPath;
- private int zkVersion;
-
- public ZKAccessControl(AccessControlEntry ace, String zkPath) {
- this(ace, zkPath, -1);
- }
-
- private ZKAccessControl(AccessControlEntry ace, String zkPath, int zkVersion) {
- this.accessControlEntry = ace;
- this.zkPath = zkPath;
- this.zkVersion = zkVersion;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(zkPath, accessControlEntry);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof ZKAccessControl)) {
- return false;
- }
- ZKAccessControl other = (ZKAccessControl) obj;
- return Objects.equal(zkPath, other.zkPath) &&
- Objects.equal(accessControlEntry, other.accessControlEntry);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("entry(path=").append(zkPath).append(", acl=")
- .append(accessControlEntry).append(")");
- return sb.toString();
- }
-
- public String getZKPath() {
- return zkPath;
- }
-
- public AccessControlEntry getAccessControlEntry() {
- return accessControlEntry;
- }
-
- public Future<ZKAccessControl> create(ZooKeeperClient zkc) {
- final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
- try {
- zkc.get().create(zkPath, serialize(accessControlEntry), zkc.getDefaultACL(), CreateMode.PERSISTENT,
- new AsyncCallback.StringCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, String name) {
- if (KeeperException.Code.OK.intValue() == rc) {
- ZKAccessControl.this.zkVersion = 0;
- promise.setValue(ZKAccessControl.this);
- } else {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
- }
- }
- }, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
- } catch (InterruptedException e) {
- promise.setException(e);
- } catch (IOException e) {
- promise.setException(e);
- }
- return promise;
- }
-
- public Future<ZKAccessControl> update(ZooKeeperClient zkc) {
- final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
- try {
- zkc.get().setData(zkPath, serialize(accessControlEntry), zkVersion, new AsyncCallback.StatCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- if (KeeperException.Code.OK.intValue() == rc) {
- ZKAccessControl.this.zkVersion = stat.getVersion();
- promise.setValue(ZKAccessControl.this);
- } else {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
- }
- }
- }, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
- } catch (InterruptedException e) {
- promise.setException(e);
- } catch (IOException e) {
- promise.setException(e);
- }
- return promise;
- }
-
- public static Future<ZKAccessControl> read(final ZooKeeperClient zkc, final String zkPath, Watcher watcher) {
- final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
-
- try {
- zkc.get().getData(zkPath, watcher, new AsyncCallback.DataCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
- if (KeeperException.Code.OK.intValue() == rc) {
- try {
- AccessControlEntry ace = deserialize(zkPath, data);
- promise.setValue(new ZKAccessControl(ace, zkPath, stat.getVersion()));
- } catch (IOException ioe) {
- promise.setException(ioe);
- }
- } else {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
- }
- }
- }, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
- } catch (InterruptedException e) {
- promise.setException(e);
- }
- return promise;
- }
-
- public static Future<Void> delete(final ZooKeeperClient zkc, final String zkPath) {
- final Promise<Void> promise = new Promise<Void>();
-
- try {
- zkc.get().delete(zkPath, -1, new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx) {
- if (KeeperException.Code.OK.intValue() == rc ||
- KeeperException.Code.NONODE.intValue() == rc) {
- promise.setValue(null);
- } else {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
- }
- }
- }, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
- } catch (InterruptedException e) {
- promise.setException(e);
- }
- return promise;
- }
-
- static byte[] serialize(AccessControlEntry ace) throws IOException {
- TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- ace.write(protocol);
- transport.flush();
- return transport.toString(UTF_8.name()).getBytes(UTF_8);
- } catch (TException e) {
- throw new IOException("Failed to serialize access control entry : ", e);
- } catch (UnsupportedEncodingException uee) {
- throw new IOException("Failed to serialize acesss control entry : ", uee);
- }
- }
-
- static AccessControlEntry deserialize(String zkPath, byte[] data) throws IOException {
- if (data.length == 0) {
- return DEFAULT_ACCESS_CONTROL_ENTRY;
- }
-
- AccessControlEntry ace = new AccessControlEntry();
- TMemoryInputTransport transport = new TMemoryInputTransport(data);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- ace.read(protocol);
- } catch (TException e) {
- throw new CorruptedAccessControlException(zkPath, e);
- }
- return ace;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControlManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControlManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControlManager.java
deleted file mode 100644
index 9c89b4a..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControlManager.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.acl;
-
-import com.google.common.collect.Sets;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.thrift.AccessControlEntry;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * ZooKeeper Based {@link com.twitter.distributedlog.acl.AccessControlManager}
- */
-public class ZKAccessControlManager implements AccessControlManager, Watcher {
-
- private static final Logger logger = LoggerFactory.getLogger(ZKAccessControlManager.class);
-
- private static final int ZK_RETRY_BACKOFF_MS = 500;
-
- protected final DistributedLogConfiguration conf;
- protected final ZooKeeperClient zkc;
- protected final String zkRootPath;
- protected final ScheduledExecutorService scheduledExecutorService;
-
- protected final ConcurrentMap<String, ZKAccessControl> streamEntries;
- protected ZKAccessControl defaultAccessControl;
- protected volatile boolean closed = false;
-
- public ZKAccessControlManager(DistributedLogConfiguration conf,
- ZooKeeperClient zkc,
- String zkRootPath,
- ScheduledExecutorService scheduledExecutorService) throws IOException {
- this.conf = conf;
- this.zkc = zkc;
- this.zkRootPath = zkRootPath;
- this.scheduledExecutorService = scheduledExecutorService;
- this.streamEntries = new ConcurrentHashMap<String, ZKAccessControl>();
- try {
- Await.result(fetchDefaultAccessControlEntry());
- } catch (Throwable t) {
- if (t instanceof InterruptedException) {
- throw new DLInterruptedException("Interrupted on getting default access control entry for " + zkRootPath, t);
- } else if (t instanceof KeeperException) {
- throw new IOException("Encountered zookeeper exception on getting default access control entry for " + zkRootPath, t);
- } else if (t instanceof IOException) {
- throw (IOException) t;
- } else {
- throw new IOException("Encountered unknown exception on getting access control entries for " + zkRootPath, t);
- }
- }
-
- try {
- Await.result(fetchAccessControlEntries());
- } catch (Throwable t) {
- if (t instanceof InterruptedException) {
- throw new DLInterruptedException("Interrupted on getting access control entries for " + zkRootPath, t);
- } else if (t instanceof KeeperException) {
- throw new IOException("Encountered zookeeper exception on getting access control entries for " + zkRootPath, t);
- } else if (t instanceof IOException) {
- throw (IOException) t;
- } else {
- throw new IOException("Encountered unknown exception on getting access control entries for " + zkRootPath, t);
- }
- }
- }
-
- protected AccessControlEntry getAccessControlEntry(String stream) {
- ZKAccessControl entry = streamEntries.get(stream);
- entry = null == entry ? defaultAccessControl : entry;
- return entry.getAccessControlEntry();
- }
-
- @Override
- public boolean allowWrite(String stream) {
- return !getAccessControlEntry(stream).isDenyWrite();
- }
-
- @Override
- public boolean allowTruncate(String stream) {
- return !getAccessControlEntry(stream).isDenyTruncate();
- }
-
- @Override
- public boolean allowDelete(String stream) {
- return !getAccessControlEntry(stream).isDenyDelete();
- }
-
- @Override
- public boolean allowAcquire(String stream) {
- return !getAccessControlEntry(stream).isDenyAcquire();
- }
-
- @Override
- public boolean allowRelease(String stream) {
- return !getAccessControlEntry(stream).isDenyRelease();
- }
-
- @Override
- public void close() {
- closed = true;
- }
-
- private Future<Void> fetchAccessControlEntries() {
- final Promise<Void> promise = new Promise<Void>();
- fetchAccessControlEntries(promise);
- return promise;
- }
-
- private void fetchAccessControlEntries(final Promise<Void> promise) {
- try {
- zkc.get().getChildren(zkRootPath, this, new AsyncCallback.Children2Callback() {
- @Override
- public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
- if (KeeperException.Code.OK.intValue() != rc) {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
- return;
- }
- Set<String> streamsReceived = new HashSet<String>();
- streamsReceived.addAll(children);
- Set<String> streamsCached = streamEntries.keySet();
- Set<String> streamsRemoved = Sets.difference(streamsCached, streamsReceived).immutableCopy();
- for (String s : streamsRemoved) {
- ZKAccessControl accessControl = streamEntries.remove(s);
- if (null != accessControl) {
- logger.info("Removed Access Control Entry for stream {} : {}", s, accessControl.getAccessControlEntry());
- }
- }
- if (streamsReceived.isEmpty()) {
- promise.setValue(null);
- return;
- }
- final AtomicInteger numPendings = new AtomicInteger(streamsReceived.size());
- final AtomicInteger numFailures = new AtomicInteger(0);
- for (String s : streamsReceived) {
- final String streamName = s;
- ZKAccessControl.read(zkc, zkRootPath + "/" + streamName, null)
- .addEventListener(new FutureEventListener<ZKAccessControl>() {
-
- @Override
- public void onSuccess(ZKAccessControl accessControl) {
- streamEntries.put(streamName, accessControl);
- logger.info("Added overrided access control for stream {} : {}", streamName, accessControl.getAccessControlEntry());
- complete();
- }
-
- @Override
- public void onFailure(Throwable cause) {
- if (cause instanceof KeeperException.NoNodeException) {
- streamEntries.remove(streamName);
- } else if (cause instanceof ZKAccessControl.CorruptedAccessControlException) {
- logger.warn("Access control is corrupted for stream {} @ {}, skipped it ...",
- new Object[] { streamName, zkRootPath, cause });
- streamEntries.remove(streamName);
- } else {
- if (1 == numFailures.incrementAndGet()) {
- promise.setException(cause);
- }
- }
- complete();
- }
-
- private void complete() {
- if (0 == numPendings.decrementAndGet() && numFailures.get() == 0) {
- promise.setValue(null);
- }
- }
- });
- }
- }
- }, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
- } catch (InterruptedException e) {
- promise.setException(e);
- }
- }
-
- private Future<ZKAccessControl> fetchDefaultAccessControlEntry() {
- final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
- fetchDefaultAccessControlEntry(promise);
- return promise;
- }
-
- private void fetchDefaultAccessControlEntry(final Promise<ZKAccessControl> promise) {
- ZKAccessControl.read(zkc, zkRootPath, this)
- .addEventListener(new FutureEventListener<ZKAccessControl>() {
- @Override
- public void onSuccess(ZKAccessControl accessControl) {
- logger.info("Default Access Control will be changed from {} to {}",
- ZKAccessControlManager.this.defaultAccessControl,
- accessControl);
- ZKAccessControlManager.this.defaultAccessControl = accessControl;
- promise.setValue(accessControl);
- }
-
- @Override
- public void onFailure(Throwable cause) {
- if (cause instanceof KeeperException.NoNodeException) {
- logger.info("Default Access Control is missing, creating one for {} ...", zkRootPath);
- createDefaultAccessControlEntryIfNeeded(promise);
- } else {
- promise.setException(cause);
- }
- }
- });
- }
-
- private void createDefaultAccessControlEntryIfNeeded(final Promise<ZKAccessControl> promise) {
- ZooKeeper zk;
- try {
- zk = zkc.get();
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
- return;
- } catch (InterruptedException e) {
- promise.setException(e);
- return;
- }
- ZkUtils.asyncCreateFullPathOptimistic(zk, zkRootPath, new byte[0], zkc.getDefaultACL(),
- CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, String name) {
- if (KeeperException.Code.OK.intValue() == rc) {
- logger.info("Created zk path {} for default ACL.", zkRootPath);
- fetchDefaultAccessControlEntry(promise);
- } else {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
- }
- }
- }, null);
- }
-
- private void refetchDefaultAccessControlEntry(final int delayMs) {
- if (closed) {
- return;
- }
- scheduledExecutorService.schedule(new Runnable() {
- @Override
- public void run() {
- fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() {
- @Override
- public void onSuccess(ZKAccessControl value) {
- // no-op
- }
- @Override
- public void onFailure(Throwable cause) {
- if (cause instanceof ZKAccessControl.CorruptedAccessControlException) {
- logger.warn("Default access control entry is corrupted, ignore this update : ", cause);
- return;
- }
-
- logger.warn("Encountered an error on refetching default access control entry, retrying in {} ms : ",
- ZK_RETRY_BACKOFF_MS, cause);
- refetchDefaultAccessControlEntry(ZK_RETRY_BACKOFF_MS);
- }
- });
- }
- }, delayMs, TimeUnit.MILLISECONDS);
- }
-
- private void refetchAccessControlEntries(final int delayMs) {
- if (closed) {
- return;
- }
- scheduledExecutorService.schedule(new Runnable() {
- @Override
- public void run() {
- fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() {
- @Override
- public void onSuccess(Void value) {
- // no-op
- }
- @Override
- public void onFailure(Throwable cause) {
- logger.warn("Encountered an error on refetching access control entries, retrying in {} ms : ",
- ZK_RETRY_BACKOFF_MS, cause);
- refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS);
- }
- });
- }
- }, delayMs, TimeUnit.MILLISECONDS);
- }
-
- private void refetchAllAccessControlEntries(final int delayMs) {
- if (closed) {
- return;
- }
- scheduledExecutorService.schedule(new Runnable() {
- @Override
- public void run() {
- fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() {
- @Override
- public void onSuccess(ZKAccessControl value) {
- fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() {
- @Override
- public void onSuccess(Void value) {
- // no-op
- }
-
- @Override
- public void onFailure(Throwable cause) {
- logger.warn("Encountered an error on fetching all access control entries, retrying in {} ms : ",
- ZK_RETRY_BACKOFF_MS, cause);
- refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS);
- }
- });
- }
-
- @Override
- public void onFailure(Throwable cause) {
- logger.warn("Encountered an error on refetching all access control entries, retrying in {} ms : ",
- ZK_RETRY_BACKOFF_MS, cause);
- refetchAllAccessControlEntries(ZK_RETRY_BACKOFF_MS);
- }
- });
- }
- }, delayMs, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void process(WatchedEvent event) {
- if (Event.EventType.None.equals(event.getType())) {
- if (event.getState() == Event.KeeperState.Expired) {
- refetchAllAccessControlEntries(0);
- }
- } else if (Event.EventType.NodeDataChanged.equals(event.getType())) {
- logger.info("Default ACL for {} is changed, refetching ...", zkRootPath);
- refetchDefaultAccessControlEntry(0);
- } else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) {
- logger.info("List of ACLs for {} are changed, refetching ...", zkRootPath);
- refetchAccessControlEntries(0);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
index 0a3fdb0..0512907 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
@@ -18,28 +18,27 @@
package com.twitter.distributedlog.admin;
import com.google.common.base.Preconditions;
-import com.twitter.distributedlog.BookKeeperClient;
-import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.google.common.collect.Lists;
import com.twitter.distributedlog.DistributedLogManager;
import com.twitter.distributedlog.LogRecordWithDLSN;
import com.twitter.distributedlog.LogSegmentMetadata;
import com.twitter.distributedlog.ReadUtils;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.ZooKeeperClientBuilder;
-import com.twitter.distributedlog.acl.ZKAccessControl;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
+import com.twitter.distributedlog.impl.acl.ZKAccessControl;
import com.twitter.distributedlog.exceptions.DLIllegalStateException;
import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
-import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
-import com.twitter.distributedlog.injector.AsyncFailureInjector;
import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
-import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
import com.twitter.distributedlog.metadata.DLMetadata;
import com.twitter.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
import com.twitter.distributedlog.metadata.MetadataUpdater;
import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.namespace.NamespaceDriver;
import com.twitter.distributedlog.thrift.AccessControlEntry;
import com.twitter.distributedlog.tools.DistributedLogTool;
-import com.twitter.distributedlog.util.DLUtils;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.SchedulerUtils;
@@ -61,6 +60,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
@@ -76,7 +76,6 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* Admin Tool for DistributedLog.
*/
-@SuppressWarnings("deprecation")
public class DistributedLogAdmin extends DistributedLogTool {
static final Logger LOG = LoggerFactory.getLogger(DistributedLogAdmin.class);
@@ -84,8 +83,8 @@ public class DistributedLogAdmin extends DistributedLogTool {
/**
* Fix inprogress segment with lower ledger sequence number.
*
- * @param factory
- * dlm factory.
+ * @param namespace
+ * dl namespace
* @param metadataUpdater
* metadata updater.
* @param streamName
@@ -96,12 +95,12 @@ public class DistributedLogAdmin extends DistributedLogTool {
* is confirmation needed before executing actual action.
* @throws IOException
*/
- public static void fixInprogressSegmentWithLowerSequenceNumber(final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+ public static void fixInprogressSegmentWithLowerSequenceNumber(final DistributedLogNamespace namespace,
final MetadataUpdater metadataUpdater,
final String streamName,
final boolean verbose,
final boolean interactive) throws IOException {
- DistributedLogManager dlm = factory.createDistributedLogManagerWithSharedClients(streamName);
+ DistributedLogManager dlm = namespace.openLog(streamName);
try {
List<LogSegmentMetadata> segments = dlm.getLogSegments();
if (verbose) {
@@ -194,37 +193,37 @@ public class DistributedLogAdmin extends DistributedLogTool {
}
public static void checkAndRepairDLNamespace(final URI uri,
- final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+ final DistributedLogNamespace namespace,
final MetadataUpdater metadataUpdater,
final OrderedScheduler scheduler,
- final BookKeeperClient bkc,
- final String digestpw,
final boolean verbose,
final boolean interactive) throws IOException {
- checkAndRepairDLNamespace(uri, factory, metadataUpdater, scheduler, bkc, digestpw, verbose, interactive, 1);
+ checkAndRepairDLNamespace(uri, namespace, metadataUpdater, scheduler, verbose, interactive, 1);
}
public static void checkAndRepairDLNamespace(final URI uri,
- final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+ final DistributedLogNamespace namespace,
final MetadataUpdater metadataUpdater,
final OrderedScheduler scheduler,
- final BookKeeperClient bkc,
- final String digestpw,
final boolean verbose,
final boolean interactive,
final int concurrency) throws IOException {
Preconditions.checkArgument(concurrency > 0, "Invalid concurrency " + concurrency + " found.");
// 0. getting streams under a given uri.
- Collection<String> streams = factory.enumerateAllLogsInNamespace();
+ Iterator<String> streamsIter = namespace.getLogs();
+ List<String> streams = Lists.newArrayList();
+ while (streamsIter.hasNext()) {
+ streams.add(streamsIter.next());
+ }
if (verbose) {
- System.out.println("- 0. checking " + streams.size() + " streams under " + uri);
+ System.out.println("- 0. checking streams under " + uri);
}
if (streams.size() == 0) {
System.out.println("+ 0. nothing to check. quit.");
return;
}
Map<String, StreamCandidate> streamCandidates =
- checkStreams(factory, streams, scheduler, bkc, digestpw, concurrency);
+ checkStreams(namespace, streams, scheduler, concurrency);
if (verbose) {
System.out.println("+ 0. " + streamCandidates.size() + " corrupted streams found.");
}
@@ -248,11 +247,9 @@ public class DistributedLogAdmin extends DistributedLogTool {
}
private static Map<String, StreamCandidate> checkStreams(
- final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+ final DistributedLogNamespace namespace,
final Collection<String> streams,
final OrderedScheduler scheduler,
- final BookKeeperClient bkc,
- final String digestpw,
final int concurrency) throws IOException {
final LinkedBlockingQueue<String> streamQueue =
new LinkedBlockingQueue<String>();
@@ -275,7 +272,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
StreamCandidate candidate;
try {
LOG.info("Checking stream {}.", stream);
- candidate = checkStream(factory, stream, scheduler, bkc, digestpw);
+ candidate = checkStream(namespace, stream, scheduler);
LOG.info("Checked stream {} - {}.", stream, candidate);
} catch (IOException e) {
LOG.error("Error on checking stream {} : ", stream, e);
@@ -316,12 +313,10 @@ public class DistributedLogAdmin extends DistributedLogTool {
}
private static StreamCandidate checkStream(
- final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+ final DistributedLogNamespace namespace,
final String streamName,
- final OrderedScheduler scheduler,
- final BookKeeperClient bkc,
- String digestpw) throws IOException {
- DistributedLogManager dlm = factory.createDistributedLogManagerWithSharedClients(streamName);
+ final OrderedScheduler scheduler) throws IOException {
+ DistributedLogManager dlm = namespace.openLog(streamName);
try {
List<LogSegmentMetadata> segments = dlm.getLogSegments();
if (segments.isEmpty()) {
@@ -330,7 +325,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
List<Future<LogSegmentCandidate>> futures =
new ArrayList<Future<LogSegmentCandidate>>(segments.size());
for (LogSegmentMetadata segment : segments) {
- futures.add(checkLogSegment(streamName, segment, scheduler, bkc, digestpw));
+ futures.add(checkLogSegment(namespace, streamName, segment, scheduler));
}
List<LogSegmentCandidate> segmentCandidates;
try {
@@ -354,21 +349,16 @@ public class DistributedLogAdmin extends DistributedLogTool {
}
private static Future<LogSegmentCandidate> checkLogSegment(
+ final DistributedLogNamespace namespace,
final String streamName,
final LogSegmentMetadata metadata,
- final OrderedScheduler scheduler,
- final BookKeeperClient bkc,
- final String digestpw) {
+ final OrderedScheduler scheduler) {
if (metadata.isInProgress()) {
return Future.value(null);
}
- final LogSegmentEntryStore entryStore = new BKLogSegmentEntryStore(
- new DistributedLogConfiguration().setBKDigestPW(digestpw),
- bkc,
- scheduler,
- NullStatsLogger.INSTANCE,
- AsyncFailureInjector.NULL);
+ final LogSegmentEntryStore entryStore = namespace.getNamespaceDriver()
+ .getLogSegmentEntryStore(NamespaceDriver.Role.READER);
return ReadUtils.asyncReadLastRecord(
streamName,
metadata,
@@ -432,6 +422,8 @@ public class DistributedLogAdmin extends DistributedLogTool {
/**
* Unbind the bookkeeper environment for a given distributedlog uri.
+ *
+ * TODO: move unbind operation to namespace driver
*/
class UnbindCommand extends OptsCommand {
@@ -491,6 +483,8 @@ public class DistributedLogAdmin extends DistributedLogTool {
/**
* Bind Command to bind bookkeeper environment for a given distributed uri.
+ *
+ * TODO: move bind to namespace driver
*/
class BindCommand extends OptsCommand {
@@ -559,7 +553,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
if (cmdline.hasOption("dlzw")) {
dlZkServersForWriter = cmdline.getOptionValue("dlzw");
} else {
- dlZkServersForWriter = DLUtils.getZKServersFromDLUri(uri);
+ dlZkServersForWriter = BKNamespaceDriver.getZKServersFromDLUri(uri);
}
if (cmdline.hasOption("dlzr")) {
dlZkServersForReader = cmdline.getOptionValue("dlzr");
@@ -689,7 +683,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
return -1;
}
for (String stream : streams) {
- fixInprogressSegmentWithLowerSequenceNumber(getFactory(), metadataUpdater, stream, verbose, !getForce());
+ fixInprogressSegmentWithLowerSequenceNumber(getNamespace(), metadataUpdater, stream, verbose, !getForce());
}
return 0;
}
@@ -739,10 +733,9 @@ public class DistributedLogAdmin extends DistributedLogTool {
.corePoolSize(Runtime.getRuntime().availableProcessors())
.build();
ExecutorService executorService = Executors.newCachedThreadPool();
- BookKeeperClient bkc = getBookKeeperClient();
try {
- checkAndRepairDLNamespace(getUri(), getFactory(), metadataUpdater, scheduler,
- bkc, getConf().getBKDigestPW(), verbose, !getForce(), concurrency);
+ checkAndRepairDLNamespace(getUri(), getNamespace(), metadataUpdater, scheduler,
+ verbose, !getForce(), concurrency);
} finally {
SchedulerUtils.shutdownScheduler(executorService, 5, TimeUnit.MINUTES);
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java
index e551c22..a081606 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java
@@ -21,18 +21,20 @@ import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
-import com.twitter.distributedlog.BKDistributedLogNamespace;
import com.twitter.distributedlog.BookKeeperClient;
import com.twitter.distributedlog.BookKeeperClientBuilder;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogManager;
import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.ZooKeeperClientBuilder;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
+import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.distributedlog.namespace.NamespaceDriver;
import com.twitter.distributedlog.util.DLUtils;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -52,8 +54,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -72,7 +74,6 @@ import static com.google.common.base.Charsets.UTF_8;
/**
* DL Auditor will audit DL namespace, e.g. find leaked ledger, report disk usage by streams.
*/
-@SuppressWarnings("deprecation")
public class DLAuditor {
private static final Logger logger = LoggerFactory.getLogger(DLAuditor.class);
@@ -83,23 +84,23 @@ public class DLAuditor {
this.conf = conf;
}
- private ZooKeeperClient getZooKeeperClient(com.twitter.distributedlog.DistributedLogManagerFactory factory) {
- DistributedLogNamespace namespace = factory.getNamespace();
- assert(namespace instanceof BKDistributedLogNamespace);
- return ((BKDistributedLogNamespace) namespace).getSharedWriterZKCForDL();
+ private ZooKeeperClient getZooKeeperClient(DistributedLogNamespace namespace) {
+ NamespaceDriver driver = namespace.getNamespaceDriver();
+ assert(driver instanceof BKNamespaceDriver);
+ return ((BKNamespaceDriver) driver).getWriterZKC();
}
- private BookKeeperClient getBookKeeperClient(com.twitter.distributedlog.DistributedLogManagerFactory factory) {
- DistributedLogNamespace namespace = factory.getNamespace();
- assert(namespace instanceof BKDistributedLogNamespace);
- return ((BKDistributedLogNamespace) namespace).getReaderBKC();
+ private BookKeeperClient getBookKeeperClient(DistributedLogNamespace namespace) {
+ NamespaceDriver driver = namespace.getNamespaceDriver();
+ assert(driver instanceof BKNamespaceDriver);
+ return ((BKNamespaceDriver) driver).getReaderBKC();
}
private String validateAndGetZKServers(List<URI> uris) {
URI firstURI = uris.get(0);
- String zkServers = DLUtils.getZKServersFromDLUri(firstURI);
+ String zkServers = BKNamespaceDriver.getZKServersFromDLUri(firstURI);
for (URI uri : uris) {
- if (!zkServers.equalsIgnoreCase(DLUtils.getZKServersFromDLUri(uri))) {
+ if (!zkServers.equalsIgnoreCase(BKNamespaceDriver.getZKServersFromDLUri(uri))) {
throw new IllegalArgumentException("Uris don't belong to same zookeeper cluster");
}
}
@@ -224,19 +225,23 @@ public class DLAuditor {
private Set<Long> collectLedgersFromDL(List<URI> uris, List<List<String>> allocationPaths)
throws IOException {
final Set<Long> ledgers = new TreeSet<Long>();
- List<com.twitter.distributedlog.DistributedLogManagerFactory> factories =
- new ArrayList<com.twitter.distributedlog.DistributedLogManagerFactory>(uris.size());
+ List<DistributedLogNamespace> namespaces =
+ new ArrayList<DistributedLogNamespace>(uris.size());
try {
for (URI uri : uris) {
- factories.add(new com.twitter.distributedlog.DistributedLogManagerFactory(conf, uri));
+ namespaces.add(
+ DistributedLogNamespaceBuilder.newBuilder()
+ .conf(conf)
+ .uri(uri)
+ .build());
}
final CountDownLatch doneLatch = new CountDownLatch(uris.size());
final AtomicInteger numFailures = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(uris.size());
try {
int i = 0;
- for (com.twitter.distributedlog.DistributedLogManagerFactory factory : factories) {
- final com.twitter.distributedlog.DistributedLogManagerFactory dlFactory = factory;
+ for (final DistributedLogNamespace namespace : namespaces) {
+ final DistributedLogNamespace dlNamespace = namespace;
final URI uri = uris.get(i);
final List<String> aps = allocationPaths.get(i);
i++;
@@ -245,12 +250,12 @@ public class DLAuditor {
public void run() {
try {
logger.info("Collecting ledgers from {} : {}", uri, aps);
- collectLedgersFromAllocator(uri, dlFactory, aps, ledgers);
+ collectLedgersFromAllocator(uri, namespace, aps, ledgers);
synchronized (ledgers) {
logger.info("Collected {} ledgers from allocators for {} : {} ",
new Object[]{ledgers.size(), uri, ledgers});
}
- collectLedgersFromDL(uri, dlFactory, ledgers);
+ collectLedgersFromDL(uri, namespace, ledgers);
} catch (IOException e) {
numFailures.incrementAndGet();
logger.info("Error to collect ledgers from DL : ", e);
@@ -273,15 +278,15 @@ public class DLAuditor {
executor.shutdown();
}
} finally {
- for (com.twitter.distributedlog.DistributedLogManagerFactory factory : factories) {
- factory.close();
+ for (DistributedLogNamespace namespace : namespaces) {
+ namespace.close();
}
}
return ledgers;
}
private void collectLedgersFromAllocator(final URI uri,
- final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+ final DistributedLogNamespace namespace,
final List<String> allocationPaths,
final Set<Long> ledgers) throws IOException {
final LinkedBlockingQueue<String> poolQueue =
@@ -289,7 +294,7 @@ public class DLAuditor {
for (String allocationPath : allocationPaths) {
String rootPath = uri.getPath() + "/" + allocationPath;
try {
- List<String> pools = getZooKeeperClient(factory).get().getChildren(rootPath, false);
+ List<String> pools = getZooKeeperClient(namespace).get().getChildren(rootPath, false);
for (String pool : pools) {
poolQueue.add(rootPath + "/" + pool);
}
@@ -318,11 +323,11 @@ public class DLAuditor {
private void collectLedgersFromPool(String poolPath)
throws InterruptedException, ZooKeeperClient.ZooKeeperConnectionException, KeeperException {
- List<String> allocators = getZooKeeperClient(factory).get()
+ List<String> allocators = getZooKeeperClient(namespace).get()
.getChildren(poolPath, false);
for (String allocator : allocators) {
String allocatorPath = poolPath + "/" + allocator;
- byte[] data = getZooKeeperClient(factory).get().getData(allocatorPath, false, new Stat());
+ byte[] data = getZooKeeperClient(namespace).get().getData(allocatorPath, false, new Stat());
if (null != data && data.length > 0) {
try {
long ledgerId = DLUtils.bytes2LogSegmentId(data);
@@ -341,30 +346,31 @@ public class DLAuditor {
}
private void collectLedgersFromDL(final URI uri,
- final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+ final DistributedLogNamespace namespace,
final Set<Long> ledgers) throws IOException {
logger.info("Enumerating {} to collect streams.", uri);
- Collection<String> streams = factory.enumerateAllLogsInNamespace();
+ Iterator<String> streams = namespace.getLogs();
final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>();
- streamQueue.addAll(streams);
+ while (streams.hasNext()) {
+ streamQueue.add(streams.next());
+ }
logger.info("Collected {} streams from uri {} : {}",
- new Object[] { streams.size(), uri, streams });
+ new Object[] { streamQueue.size(), uri, streams });
executeAction(streamQueue, 10, new Action<String>() {
@Override
public void execute(String stream) throws IOException {
- collectLedgersFromStream(factory, stream, ledgers);
+ collectLedgersFromStream(namespace, stream, ledgers);
}
});
}
- private List<Long> collectLedgersFromStream(com.twitter.distributedlog.DistributedLogManagerFactory factory,
+ private List<Long> collectLedgersFromStream(DistributedLogNamespace namespace,
String stream,
Set<Long> ledgers)
throws IOException {
- DistributedLogManager dlm = factory.createDistributedLogManager(stream,
- com.twitter.distributedlog.DistributedLogManagerFactory.ClientSharingOption.SharedClients);
+ DistributedLogManager dlm = namespace.openLog(stream);
try {
List<LogSegmentMetadata> segments = dlm.getLogSegments();
List<Long> sLedgers = new ArrayList<Long>();
@@ -388,21 +394,25 @@ public class DLAuditor {
*/
public Map<String, Long> calculateStreamSpaceUsage(final URI uri) throws IOException {
logger.info("Collecting stream space usage for {}.", uri);
- com.twitter.distributedlog.DistributedLogManagerFactory factory =
- new com.twitter.distributedlog.DistributedLogManagerFactory(conf, uri);
+ DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ .conf(conf)
+ .uri(uri)
+ .build();
try {
- return calculateStreamSpaceUsage(uri, factory);
+ return calculateStreamSpaceUsage(uri, namespace);
} finally {
- factory.close();
+ namespace.close();
}
}
private Map<String, Long> calculateStreamSpaceUsage(
- final URI uri, final com.twitter.distributedlog.DistributedLogManagerFactory factory)
+ final URI uri, final DistributedLogNamespace namespace)
throws IOException {
- Collection<String> streams = factory.enumerateAllLogsInNamespace();
+ Iterator<String> streams = namespace.getLogs();
final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>();
- streamQueue.addAll(streams);
+ while (streams.hasNext()) {
+ streamQueue.add(streams.next());
+ }
final Map<String, Long> streamSpaceUsageMap =
new ConcurrentSkipListMap<String, Long>();
@@ -412,7 +422,7 @@ public class DLAuditor {
@Override
public void execute(String stream) throws IOException {
streamSpaceUsageMap.put(stream,
- calculateStreamSpaceUsage(factory, stream));
+ calculateStreamSpaceUsage(namespace, stream));
if (numStreamsCollected.incrementAndGet() % 1000 == 0) {
logger.info("Calculated {} streams from uri {}.", numStreamsCollected.get(), uri);
}
@@ -422,16 +432,15 @@ public class DLAuditor {
return streamSpaceUsageMap;
}
- private long calculateStreamSpaceUsage(final com.twitter.distributedlog.DistributedLogManagerFactory factory,
+ private long calculateStreamSpaceUsage(final DistributedLogNamespace namespace,
final String stream) throws IOException {
- DistributedLogManager dlm = factory.createDistributedLogManager(stream,
- com.twitter.distributedlog.DistributedLogManagerFactory.ClientSharingOption.SharedClients);
+ DistributedLogManager dlm = namespace.openLog(stream);
long totalBytes = 0;
try {
List<LogSegmentMetadata> segments = dlm.getLogSegments();
for (LogSegmentMetadata segment : segments) {
try {
- LedgerHandle lh = getBookKeeperClient(factory).get().openLedgerNoRecovery(segment.getLogSegmentId(),
+ LedgerHandle lh = getBookKeeperClient(namespace).get().openLedgerNoRecovery(segment.getLogSegmentId(),
BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
totalBytes += lh.getLength();
lh.close();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKDLUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKDLUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKDLUtils.java
deleted file mode 100644
index dd78a4e..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKDLUtils.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
-
-import java.net.URI;
-
-/**
- * Utils for bookkeeper based distributedlog implementation
- */
-public class BKDLUtils {
-
- /**
- * Is it a reserved stream name in bkdl namespace?
- *
- * @param name
- * stream name
- * @return true if it is reserved name, otherwise false.
- */
- public static boolean isReservedStreamName(String name) {
- return name.startsWith(".");
- }
-
- /**
- * Validate the configuration and uri.
- *
- * @param conf
- * distributedlog configuration
- * @param uri
- * distributedlog uri
- * @throws IllegalArgumentException
- */
- public static void validateConfAndURI(DistributedLogConfiguration conf, URI uri)
- throws IllegalArgumentException {
- if (null == conf) {
- throw new IllegalArgumentException("Incorrect Configuration");
- } else {
- conf.validate();
- }
- if ((null == uri) || (null == uri.getAuthority()) || (null == uri.getPath())) {
- throw new IllegalArgumentException("Incorrect ZK URI");
- }
- }
-
- /**
- * Validate the stream name.
- *
- * @param nameOfStream
- * name of stream
- * @throws InvalidStreamNameException
- */
- public static void validateName(String nameOfStream)
- throws InvalidStreamNameException {
- String reason = null;
- char chars[] = nameOfStream.toCharArray();
- char c;
- // validate the stream to see if meet zookeeper path's requirement
- for (int i = 0; i < chars.length; i++) {
- c = chars[i];
-
- if (c == 0) {
- reason = "null character not allowed @" + i;
- break;
- } else if (c == '/') {
- reason = "'/' not allowed @" + i;
- break;
- } else if (c > '\u0000' && c < '\u001f'
- || c > '\u007f' && c < '\u009F'
- || c > '\ud800' && c < '\uf8ff'
- || c > '\ufff0' && c < '\uffff') {
- reason = "invalid charater @" + i;
- break;
- }
- }
- if (null != reason) {
- throw new InvalidStreamNameException(nameOfStream, reason);
- }
- if (isReservedStreamName(nameOfStream)) {
- throw new InvalidStreamNameException(nameOfStream,
- "Stream Name is reserved");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java
new file mode 100644
index 0000000..5921233
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java
@@ -0,0 +1,631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.distributedlog.BookKeeperClient;
+import com.twitter.distributedlog.BookKeeperClientBuilder;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.MetadataAccessor;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.ZooKeeperClientBuilder;
+import com.twitter.distributedlog.acl.AccessControlManager;
+import com.twitter.distributedlog.acl.DefaultAccessControlManager;
+import com.twitter.distributedlog.impl.acl.ZKAccessControlManager;
+import com.twitter.distributedlog.bk.LedgerAllocator;
+import com.twitter.distributedlog.bk.LedgerAllocatorUtils;
+import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+import com.twitter.distributedlog.exceptions.AlreadyClosedException;
+import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
+import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
+import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
+import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
+import com.twitter.distributedlog.impl.subscription.ZKSubscriptionsStore;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
+import com.twitter.distributedlog.metadata.LogMetadataForReader;
+import com.twitter.distributedlog.metadata.LogMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
+import com.twitter.distributedlog.namespace.NamespaceDriver;
+import com.twitter.distributedlog.namespace.NamespaceDriverManager;
+import com.twitter.distributedlog.subscription.SubscriptionsStore;
+import com.twitter.distributedlog.util.OrderedScheduler;
+import com.twitter.distributedlog.util.Utils;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.RetryPolicy;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.Stat;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.twitter.distributedlog.util.DLUtils.isReservedStreamName;
+import static com.twitter.distributedlog.util.DLUtils.validateName;
+
+/**
+ * Manager for ZooKeeper/BookKeeper based namespace
+ */
+public class BKNamespaceDriver implements NamespaceDriver {
+
+ private static Logger LOG = LoggerFactory.getLogger(BKNamespaceDriver.class);
+
+ // register itself
+ static {
+ NamespaceDriverManager.registerDriver(DistributedLogConstants.BACKEND_BK, BKNamespaceDriver.class);
+ }
+
+ /**
+ * Extract zk servers fro dl <i>namespace</i>.
+ *
+ * @param uri dl namespace
+ * @return zk servers
+ */
+ public static String getZKServersFromDLUri(URI uri) {
+ return uri.getAuthority().replace(";", ",");
+ }
+
+ // resources (passed from initialization)
+ private DistributedLogConfiguration conf;
+ private DynamicDistributedLogConfiguration dynConf;
+ private URI namespace;
+ private OrderedScheduler scheduler;
+ private FeatureProvider featureProvider;
+ private AsyncFailureInjector failureInjector;
+ private StatsLogger statsLogger;
+ private StatsLogger perLogStatsLogger;
+ private String clientId;
+ private int regionId;
+
+ //
+ // resources (created internally and initialized at #initialize())
+ //
+
+ // namespace binding
+ private BKDLConfig bkdlConfig;
+
+ // zookeeper clients
+ // NOTE: The actual zookeeper client is initialized lazily when it is referenced by
+ // {@link com.twitter.distributedlog.ZooKeeperClient#get()}. So it is safe to
+ // keep builders and their client wrappers here, as they will be used when
+ // instantiating readers or writers.
+ private ZooKeeperClientBuilder sharedWriterZKCBuilder;
+ private ZooKeeperClient writerZKC;
+ private ZooKeeperClientBuilder sharedReaderZKCBuilder;
+ private ZooKeeperClient readerZKC;
+ // NOTE: The actual bookkeeper client is initialized lazily when it is referenced by
+ // {@link com.twitter.distributedlog.BookKeeperClient#get()}. So it is safe to
+ // keep builders and their client wrappers here, as they will be used when
+ // instantiating readers or writers.
+ private ClientSocketChannelFactory channelFactory;
+ private HashedWheelTimer requestTimer;
+ private BookKeeperClientBuilder sharedWriterBKCBuilder;
+ private BookKeeperClient writerBKC;
+ private BookKeeperClientBuilder sharedReaderBKCBuilder;
+ private BookKeeperClient readerBKC;
+
+ // log stream metadata store
+ private LogMetadataStore metadataStore;
+ private LogStreamMetadataStore writerStreamMetadataStore;
+ private LogStreamMetadataStore readerStreamMetadataStore;
+
+ //
+ // resources (lazily initialized)
+ //
+
+ // ledger allocator
+ private LedgerAllocator allocator;
+
+ // log segment entry stores
+ private LogSegmentEntryStore writerEntryStore;
+ private LogSegmentEntryStore readerEntryStore;
+
+ // access control manager
+ private AccessControlManager accessControlManager;
+
+ //
+ // states
+ //
+ protected boolean initialized = false;
+ protected AtomicBoolean closed = new AtomicBoolean(false);
+
+ /**
+ * Public constructor for reflection.
+ */
+ public BKNamespaceDriver() {
+ }
+
+ @Override
+ public synchronized NamespaceDriver initialize(DistributedLogConfiguration conf,
+ DynamicDistributedLogConfiguration dynConf,
+ URI namespace,
+ OrderedScheduler scheduler,
+ FeatureProvider featureProvider,
+ AsyncFailureInjector failureInjector,
+ StatsLogger statsLogger,
+ StatsLogger perLogStatsLogger,
+ String clientId,
+ int regionId) throws IOException {
+ if (initialized) {
+ return this;
+ }
+ // validate the namespace
+ if ((null == namespace) || (null == namespace.getAuthority()) || (null == namespace.getPath())) {
+ throw new IOException("Incorrect distributedlog namespace : " + namespace);
+ }
+
+ // initialize the resources
+ this.conf = conf;
+ this.dynConf = dynConf;
+ this.namespace = namespace;
+ this.scheduler = scheduler;
+ this.featureProvider = featureProvider;
+ this.failureInjector = failureInjector;
+ this.statsLogger = statsLogger;
+ this.perLogStatsLogger = perLogStatsLogger;
+ this.clientId = clientId;
+ this.regionId = regionId;
+
+ // initialize the zookeeper clients
+ initializeZooKeeperClients();
+
+ // initialize the bookkeeper clients
+ initializeBookKeeperClients();
+
+ // propagate bkdlConfig to configuration
+ BKDLConfig.propagateConfiguration(bkdlConfig, conf);
+
+ // initialize the log metadata & stream metadata store
+ initializeLogStreamMetadataStores();
+
+ // initialize other resources
+ initializeOtherResources();
+
+ initialized = true;
+
+ LOG.info("Initialized BK namespace driver: clientId = {}, regionId = {}, federated = {}.",
+ new Object[]{clientId, regionId, bkdlConfig.isFederatedNamespace()});
+ return this;
+ }
+
+ private void initializeZooKeeperClients() throws IOException {
+ // Build the namespace zookeeper client
+ this.sharedWriterZKCBuilder = createZKClientBuilder(
+ String.format("dlzk:%s:factory_writer_shared", namespace),
+ conf,
+ getZKServersFromDLUri(namespace),
+ statsLogger.scope("dlzk_factory_writer_shared"));
+ this.writerZKC = sharedWriterZKCBuilder.build();
+
+ // Resolve namespace binding
+ this.bkdlConfig = BKDLConfig.resolveDLConfig(writerZKC, namespace);
+
+ // Build zookeeper client for readers
+ if (bkdlConfig.getDlZkServersForWriter().equals(bkdlConfig.getDlZkServersForReader())) {
+ this.sharedReaderZKCBuilder = this.sharedWriterZKCBuilder;
+ } else {
+ this.sharedReaderZKCBuilder = createZKClientBuilder(
+ String.format("dlzk:%s:factory_reader_shared", namespace),
+ conf,
+ bkdlConfig.getDlZkServersForReader(),
+ statsLogger.scope("dlzk_factory_reader_shared"));
+ }
+ this.readerZKC = this.sharedReaderZKCBuilder.build();
+ }
+
+ private synchronized BKDLConfig getBkdlConfig() {
+ return bkdlConfig;
+ }
+
+ private void initializeBookKeeperClients() throws IOException {
+ this.channelFactory = new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()),
+ Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()),
+ conf.getBKClientNumberIOThreads());
+ this.requestTimer = new HashedWheelTimer(
+ new ThreadFactoryBuilder().setNameFormat("DLFactoryTimer-%d").build(),
+ conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
+ conf.getTimeoutTimerNumTicks());
+ // Build bookkeeper client for writers
+ this.sharedWriterBKCBuilder = createBKCBuilder(
+ String.format("bk:%s:factory_writer_shared", namespace),
+ conf,
+ bkdlConfig.getBkZkServersForWriter(),
+ bkdlConfig.getBkLedgersPath(),
+ channelFactory,
+ requestTimer,
+ Optional.of(featureProvider.scope("bkc")),
+ statsLogger);
+ this.writerBKC = this.sharedWriterBKCBuilder.build();
+
+ // Build bookkeeper client for readers
+ if (bkdlConfig.getBkZkServersForWriter().equals(bkdlConfig.getBkZkServersForReader())) {
+ this.sharedReaderBKCBuilder = this.sharedWriterBKCBuilder;
+ } else {
+ this.sharedReaderBKCBuilder = createBKCBuilder(
+ String.format("bk:%s:factory_reader_shared", namespace),
+ conf,
+ bkdlConfig.getBkZkServersForReader(),
+ bkdlConfig.getBkLedgersPath(),
+ channelFactory,
+ requestTimer,
+ Optional.<FeatureProvider>absent(),
+ statsLogger);
+ }
+ this.readerBKC = this.sharedReaderBKCBuilder.build();
+ }
+
+ private void initializeLogStreamMetadataStores() throws IOException {
+ // log metadata store
+ if (bkdlConfig.isFederatedNamespace() || conf.isFederatedNamespaceEnabled()) {
+ this.metadataStore = new FederatedZKLogMetadataStore(conf, namespace, readerZKC, scheduler);
+ } else {
+ this.metadataStore = new ZKLogMetadataStore(conf, namespace, readerZKC, scheduler);
+ }
+
+ // create log stream metadata store
+ this.writerStreamMetadataStore =
+ new ZKLogStreamMetadataStore(
+ clientId,
+ conf,
+ writerZKC,
+ scheduler,
+ statsLogger);
+ this.readerStreamMetadataStore =
+ new ZKLogStreamMetadataStore(
+ clientId,
+ conf,
+ readerZKC,
+ scheduler,
+ statsLogger);
+ }
+
+ @VisibleForTesting
+ public static String validateAndGetFullLedgerAllocatorPoolPath(DistributedLogConfiguration conf, URI uri) throws IOException {
+ String poolPath = conf.getLedgerAllocatorPoolPath();
+ LOG.info("PoolPath is {}", poolPath);
+ if (null == poolPath || !poolPath.startsWith(".") || poolPath.endsWith("/")) {
+ LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", poolPath);
+ throw new IOException("Invalid ledger allocator pool path specified : " + poolPath);
+ }
+ String poolName = conf.getLedgerAllocatorPoolName();
+ if (null == poolName) {
+ LOG.error("No ledger allocator pool name specified when enabling ledger allocator pool.");
+ throw new IOException("No ledger allocator name specified when enabling ledger allocator pool.");
+ }
+ String rootPath = uri.getPath() + "/" + poolPath + "/" + poolName;
+ try {
+ PathUtils.validatePath(rootPath);
+ } catch (IllegalArgumentException iae) {
+ LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", poolPath);
+ throw new IOException("Invalid ledger allocator pool path specified : " + poolPath);
+ }
+ return rootPath;
+ }
+
+ private void initializeOtherResources() throws IOException {
+ // Ledger allocator
+ if (conf.getEnableLedgerAllocatorPool()) {
+ String allocatorPoolPath = validateAndGetFullLedgerAllocatorPoolPath(conf, namespace);
+ allocator = LedgerAllocatorUtils.createLedgerAllocatorPool(
+ allocatorPoolPath,
+ conf.getLedgerAllocatorPoolCoreSize(),
+ conf,
+ writerZKC,
+ writerBKC,
+ scheduler);
+ if (null != allocator) {
+ allocator.start();
+ }
+ LOG.info("Created ledger allocator pool under {} with size {}.", allocatorPoolPath, conf.getLedgerAllocatorPoolCoreSize());
+ } else {
+ allocator = null;
+ }
+
+ }
+
+ private void checkState() throws IOException {
+ if (closed.get()) {
+ LOG.error("BK namespace driver {} is already closed", namespace);
+ throw new AlreadyClosedException("BK namespace driver " + namespace + " is already closed");
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed.compareAndSet(false, true)) {
+ return;
+ }
+ doClose();
+ }
+
+ private void doClose() {
+ if (null != accessControlManager) {
+ accessControlManager.close();
+ LOG.info("Access Control Manager Stopped.");
+ }
+
+ // Close the allocator
+ if (null != allocator) {
+ Utils.closeQuietly(allocator);
+ LOG.info("Ledger Allocator stopped.");
+ }
+
+ // Shutdown log segment metadata stores
+ Utils.close(writerStreamMetadataStore);
+ Utils.close(readerStreamMetadataStore);
+
+ writerBKC.close();
+ readerBKC.close();
+ writerZKC.close();
+ readerZKC.close();
+ // release bookkeeper resources
+ channelFactory.releaseExternalResources();
+ LOG.info("Release external resources used by channel factory.");
+ requestTimer.stop();
+ LOG.info("Stopped request timer");
+ }
+
+ @Override
+ public URI getUri() {
+ return namespace;
+ }
+
+ @Override
+ public String getScheme() {
+ return DistributedLogConstants.BACKEND_BK;
+ }
+
+ @Override
+ public LogMetadataStore getLogMetadataStore() {
+ return metadataStore;
+ }
+
+ @Override
+ public LogStreamMetadataStore getLogStreamMetadataStore(Role role) {
+ if (Role.WRITER == role) {
+ return writerStreamMetadataStore;
+ } else {
+ return readerStreamMetadataStore;
+ }
+ }
+
+ @Override
+ public LogSegmentEntryStore getLogSegmentEntryStore(Role role) {
+ if (Role.WRITER == role) {
+ return getWriterEntryStore();
+ } else {
+ return getReaderEntryStore();
+ }
+ }
+
+ private LogSegmentEntryStore getWriterEntryStore() {
+ if (null == writerEntryStore) {
+ writerEntryStore = new BKLogSegmentEntryStore(
+ conf,
+ dynConf,
+ writerZKC,
+ writerBKC,
+ scheduler,
+ allocator,
+ statsLogger,
+ failureInjector);
+ }
+ return writerEntryStore;
+ }
+
+ private LogSegmentEntryStore getReaderEntryStore() {
+ if (null == readerEntryStore) {
+ readerEntryStore = new BKLogSegmentEntryStore(
+ conf,
+ dynConf,
+ writerZKC,
+ readerBKC,
+ scheduler,
+ allocator,
+ statsLogger,
+ failureInjector);
+ }
+ return readerEntryStore;
+ }
+
+ @Override
+ public AccessControlManager getAccessControlManager() throws IOException {
+ if (null == accessControlManager) {
+ String aclRootPath = getBkdlConfig().getACLRootPath();
+ // Build the access control manager
+ if (aclRootPath == null) {
+ accessControlManager = DefaultAccessControlManager.INSTANCE;
+ LOG.info("Created default access control manager for {}", namespace);
+ } else {
+ if (!isReservedStreamName(aclRootPath)) {
+ throw new IOException("Invalid Access Control List Root Path : " + aclRootPath);
+ }
+ String zkRootPath = namespace.getPath() + "/" + aclRootPath;
+ LOG.info("Creating zk based access control manager @ {} for {}",
+ zkRootPath, namespace);
+ accessControlManager = new ZKAccessControlManager(conf, readerZKC,
+ zkRootPath, scheduler);
+ LOG.info("Created zk based access control manager @ {} for {}",
+ zkRootPath, namespace);
+ }
+ }
+ return accessControlManager;
+ }
+
+ @Override
+ public SubscriptionsStore getSubscriptionsStore(String streamName) {
+ return new ZKSubscriptionsStore(
+ writerZKC,
+ LogMetadataForReader.getSubscribersPath(namespace, streamName, conf.getUnpartitionedStreamName()));
+ }
+
+ //
+ // Legacy Intefaces
+ //
+
+ @Override
+ public MetadataAccessor getMetadataAccessor(String streamName)
+ throws InvalidStreamNameException, IOException {
+ if (getBkdlConfig().isFederatedNamespace()) {
+ throw new UnsupportedOperationException();
+ }
+ checkState();
+ validateName(streamName);
+ return new ZKMetadataAccessor(
+ streamName,
+ conf,
+ namespace,
+ sharedWriterZKCBuilder,
+ sharedReaderZKCBuilder,
+ statsLogger);
+ }
+
+ public Map<String, byte[]> enumerateLogsWithMetadataInNamespace()
+ throws IOException, IllegalArgumentException {
+ String namespaceRootPath = namespace.getPath();
+ HashMap<String, byte[]> result = new HashMap<String, byte[]>();
+ ZooKeeperClient zkc = writerZKC;
+ try {
+ ZooKeeper zk = Utils.sync(zkc, namespaceRootPath);
+ Stat currentStat = zk.exists(namespaceRootPath, false);
+ if (currentStat == null) {
+ return result;
+ }
+ List<String> children = zk.getChildren(namespaceRootPath, false);
+ for(String child: children) {
+ if (isReservedStreamName(child)) {
+ continue;
+ }
+ String zkPath = String.format("%s/%s", namespaceRootPath, child);
+ currentStat = zk.exists(zkPath, false);
+ if (currentStat == null) {
+ result.put(child, new byte[0]);
+ } else {
+ result.put(child, zk.getData(zkPath, false, currentStat));
+ }
+ }
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted while deleting " + namespaceRootPath, ie);
+ throw new IOException("Interrupted while reading " + namespaceRootPath, ie);
+ } catch (KeeperException ke) {
+ LOG.error("Error reading" + namespaceRootPath + "entry in zookeeper", ke);
+ throw new IOException("Error reading" + namespaceRootPath + "entry in zookeeper", ke);
+ }
+ return result;
+ }
+
+ //
+ // Zk & Bk Utils
+ //
+
+ public static ZooKeeperClientBuilder createZKClientBuilder(String zkcName,
+ DistributedLogConfiguration conf,
+ String zkServers,
+ StatsLogger statsLogger) {
+ RetryPolicy retryPolicy = null;
+ if (conf.getZKNumRetries() > 0) {
+ retryPolicy = new BoundExponentialBackoffRetryPolicy(
+ conf.getZKRetryBackoffStartMillis(),
+ conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries());
+ }
+ ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder()
+ .name(zkcName)
+ .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
+ .retryThreadCount(conf.getZKClientNumberRetryThreads())
+ .requestRateLimit(conf.getZKRequestRateLimit())
+ .zkServers(zkServers)
+ .retryPolicy(retryPolicy)
+ .statsLogger(statsLogger)
+ .zkAclId(conf.getZkAclId());
+ LOG.info("Created shared zooKeeper client builder {}: zkServers = {}, numRetries = {}, sessionTimeout = {}, retryBackoff = {},"
+ + " maxRetryBackoff = {}, zkAclId = {}.", new Object[] { zkcName, zkServers, conf.getZKNumRetries(),
+ conf.getZKSessionTimeoutMilliseconds(), conf.getZKRetryBackoffStartMillis(),
+ conf.getZKRetryBackoffMaxMillis(), conf.getZkAclId() });
+ return builder;
+ }
+
+ private BookKeeperClientBuilder createBKCBuilder(String bkcName,
+ DistributedLogConfiguration conf,
+ String zkServers,
+ String ledgersPath,
+ ClientSocketChannelFactory channelFactory,
+ HashedWheelTimer requestTimer,
+ Optional<FeatureProvider> featureProviderOptional,
+ StatsLogger statsLogger) {
+ BookKeeperClientBuilder builder = BookKeeperClientBuilder.newBuilder()
+ .name(bkcName)
+ .dlConfig(conf)
+ .zkServers(zkServers)
+ .ledgersPath(ledgersPath)
+ .channelFactory(channelFactory)
+ .requestTimer(requestTimer)
+ .featureProvider(featureProviderOptional)
+ .statsLogger(statsLogger);
+ LOG.info("Created shared client builder {} : zkServers = {}, ledgersPath = {}, numIOThreads = {}",
+ new Object[] { bkcName, zkServers, ledgersPath, conf.getBKClientNumberIOThreads() });
+ return builder;
+ }
+
+ //
+ // Test Methods
+ //
+
+ @VisibleForTesting
+ public ZooKeeperClient getWriterZKC() {
+ return writerZKC;
+ }
+
+ @VisibleForTesting
+ public BookKeeperClient getReaderBKC() {
+ return readerBKC;
+ }
+
+ @VisibleForTesting
+ public AsyncFailureInjector getFailureInjector() {
+ return this.failureInjector;
+ }
+
+ @VisibleForTesting
+ public LogStreamMetadataStore getWriterStreamMetadataStore() {
+ return writerStreamMetadataStore;
+ }
+
+ @VisibleForTesting
+ public LedgerAllocator getLedgerAllocator() {
+ return allocator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java
index b84ab2e..50b1405 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java
@@ -36,7 +36,7 @@ import java.net.URI;
import java.util.Iterator;
import java.util.List;
-import static com.twitter.distributedlog.impl.BKDLUtils.*;
+import static com.twitter.distributedlog.util.DLUtils.*;
/**
* ZooKeeper based log metadata store