You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:29 UTC
[24/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java
deleted file mode 100644
index 5921233..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java
+++ /dev/null
@@ -1,631 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.distributedlog.BookKeeperClient;
-import com.twitter.distributedlog.BookKeeperClientBuilder;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.MetadataAccessor;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.ZooKeeperClientBuilder;
-import com.twitter.distributedlog.acl.AccessControlManager;
-import com.twitter.distributedlog.acl.DefaultAccessControlManager;
-import com.twitter.distributedlog.impl.acl.ZKAccessControlManager;
-import com.twitter.distributedlog.bk.LedgerAllocator;
-import com.twitter.distributedlog.bk.LedgerAllocatorUtils;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.AlreadyClosedException;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
-import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
-import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
-import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
-import com.twitter.distributedlog.impl.subscription.ZKSubscriptionsStore;
-import com.twitter.distributedlog.injector.AsyncFailureInjector;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
-import com.twitter.distributedlog.impl.metadata.BKDLConfig;
-import com.twitter.distributedlog.metadata.LogMetadataForReader;
-import com.twitter.distributedlog.metadata.LogMetadataStore;
-import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
-import com.twitter.distributedlog.namespace.NamespaceDriver;
-import com.twitter.distributedlog.namespace.NamespaceDriverManager;
-import com.twitter.distributedlog.subscription.SubscriptionsStore;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.Utils;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.RetryPolicy;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.data.Stat;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static com.twitter.distributedlog.util.DLUtils.isReservedStreamName;
-import static com.twitter.distributedlog.util.DLUtils.validateName;
-
-/**
- * Manager for ZooKeeper/BookKeeper based namespace
- */
-public class BKNamespaceDriver implements NamespaceDriver {
-
- private static Logger LOG = LoggerFactory.getLogger(BKNamespaceDriver.class);
-
- // register itself
- static {
- NamespaceDriverManager.registerDriver(DistributedLogConstants.BACKEND_BK, BKNamespaceDriver.class);
- }
-
- /**
- * Extract zk servers fro dl <i>namespace</i>.
- *
- * @param uri dl namespace
- * @return zk servers
- */
- public static String getZKServersFromDLUri(URI uri) {
- return uri.getAuthority().replace(";", ",");
- }
-
- // resources (passed from initialization)
- private DistributedLogConfiguration conf;
- private DynamicDistributedLogConfiguration dynConf;
- private URI namespace;
- private OrderedScheduler scheduler;
- private FeatureProvider featureProvider;
- private AsyncFailureInjector failureInjector;
- private StatsLogger statsLogger;
- private StatsLogger perLogStatsLogger;
- private String clientId;
- private int regionId;
-
- //
- // resources (created internally and initialized at #initialize())
- //
-
- // namespace binding
- private BKDLConfig bkdlConfig;
-
- // zookeeper clients
- // NOTE: The actual zookeeper client is initialized lazily when it is referenced by
- // {@link com.twitter.distributedlog.ZooKeeperClient#get()}. So it is safe to
- // keep builders and their client wrappers here, as they will be used when
- // instantiating readers or writers.
- private ZooKeeperClientBuilder sharedWriterZKCBuilder;
- private ZooKeeperClient writerZKC;
- private ZooKeeperClientBuilder sharedReaderZKCBuilder;
- private ZooKeeperClient readerZKC;
- // NOTE: The actual bookkeeper client is initialized lazily when it is referenced by
- // {@link com.twitter.distributedlog.BookKeeperClient#get()}. So it is safe to
- // keep builders and their client wrappers here, as they will be used when
- // instantiating readers or writers.
- private ClientSocketChannelFactory channelFactory;
- private HashedWheelTimer requestTimer;
- private BookKeeperClientBuilder sharedWriterBKCBuilder;
- private BookKeeperClient writerBKC;
- private BookKeeperClientBuilder sharedReaderBKCBuilder;
- private BookKeeperClient readerBKC;
-
- // log stream metadata store
- private LogMetadataStore metadataStore;
- private LogStreamMetadataStore writerStreamMetadataStore;
- private LogStreamMetadataStore readerStreamMetadataStore;
-
- //
- // resources (lazily initialized)
- //
-
- // ledger allocator
- private LedgerAllocator allocator;
-
- // log segment entry stores
- private LogSegmentEntryStore writerEntryStore;
- private LogSegmentEntryStore readerEntryStore;
-
- // access control manager
- private AccessControlManager accessControlManager;
-
- //
- // states
- //
- protected boolean initialized = false;
- protected AtomicBoolean closed = new AtomicBoolean(false);
-
- /**
- * Public constructor for reflection.
- */
- public BKNamespaceDriver() {
- }
-
- @Override
- public synchronized NamespaceDriver initialize(DistributedLogConfiguration conf,
- DynamicDistributedLogConfiguration dynConf,
- URI namespace,
- OrderedScheduler scheduler,
- FeatureProvider featureProvider,
- AsyncFailureInjector failureInjector,
- StatsLogger statsLogger,
- StatsLogger perLogStatsLogger,
- String clientId,
- int regionId) throws IOException {
- if (initialized) {
- return this;
- }
- // validate the namespace
- if ((null == namespace) || (null == namespace.getAuthority()) || (null == namespace.getPath())) {
- throw new IOException("Incorrect distributedlog namespace : " + namespace);
- }
-
- // initialize the resources
- this.conf = conf;
- this.dynConf = dynConf;
- this.namespace = namespace;
- this.scheduler = scheduler;
- this.featureProvider = featureProvider;
- this.failureInjector = failureInjector;
- this.statsLogger = statsLogger;
- this.perLogStatsLogger = perLogStatsLogger;
- this.clientId = clientId;
- this.regionId = regionId;
-
- // initialize the zookeeper clients
- initializeZooKeeperClients();
-
- // initialize the bookkeeper clients
- initializeBookKeeperClients();
-
- // propagate bkdlConfig to configuration
- BKDLConfig.propagateConfiguration(bkdlConfig, conf);
-
- // initialize the log metadata & stream metadata store
- initializeLogStreamMetadataStores();
-
- // initialize other resources
- initializeOtherResources();
-
- initialized = true;
-
- LOG.info("Initialized BK namespace driver: clientId = {}, regionId = {}, federated = {}.",
- new Object[]{clientId, regionId, bkdlConfig.isFederatedNamespace()});
- return this;
- }
-
- private void initializeZooKeeperClients() throws IOException {
- // Build the namespace zookeeper client
- this.sharedWriterZKCBuilder = createZKClientBuilder(
- String.format("dlzk:%s:factory_writer_shared", namespace),
- conf,
- getZKServersFromDLUri(namespace),
- statsLogger.scope("dlzk_factory_writer_shared"));
- this.writerZKC = sharedWriterZKCBuilder.build();
-
- // Resolve namespace binding
- this.bkdlConfig = BKDLConfig.resolveDLConfig(writerZKC, namespace);
-
- // Build zookeeper client for readers
- if (bkdlConfig.getDlZkServersForWriter().equals(bkdlConfig.getDlZkServersForReader())) {
- this.sharedReaderZKCBuilder = this.sharedWriterZKCBuilder;
- } else {
- this.sharedReaderZKCBuilder = createZKClientBuilder(
- String.format("dlzk:%s:factory_reader_shared", namespace),
- conf,
- bkdlConfig.getDlZkServersForReader(),
- statsLogger.scope("dlzk_factory_reader_shared"));
- }
- this.readerZKC = this.sharedReaderZKCBuilder.build();
- }
-
- private synchronized BKDLConfig getBkdlConfig() {
- return bkdlConfig;
- }
-
- private void initializeBookKeeperClients() throws IOException {
- this.channelFactory = new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()),
- Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()),
- conf.getBKClientNumberIOThreads());
- this.requestTimer = new HashedWheelTimer(
- new ThreadFactoryBuilder().setNameFormat("DLFactoryTimer-%d").build(),
- conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
- conf.getTimeoutTimerNumTicks());
- // Build bookkeeper client for writers
- this.sharedWriterBKCBuilder = createBKCBuilder(
- String.format("bk:%s:factory_writer_shared", namespace),
- conf,
- bkdlConfig.getBkZkServersForWriter(),
- bkdlConfig.getBkLedgersPath(),
- channelFactory,
- requestTimer,
- Optional.of(featureProvider.scope("bkc")),
- statsLogger);
- this.writerBKC = this.sharedWriterBKCBuilder.build();
-
- // Build bookkeeper client for readers
- if (bkdlConfig.getBkZkServersForWriter().equals(bkdlConfig.getBkZkServersForReader())) {
- this.sharedReaderBKCBuilder = this.sharedWriterBKCBuilder;
- } else {
- this.sharedReaderBKCBuilder = createBKCBuilder(
- String.format("bk:%s:factory_reader_shared", namespace),
- conf,
- bkdlConfig.getBkZkServersForReader(),
- bkdlConfig.getBkLedgersPath(),
- channelFactory,
- requestTimer,
- Optional.<FeatureProvider>absent(),
- statsLogger);
- }
- this.readerBKC = this.sharedReaderBKCBuilder.build();
- }
-
- private void initializeLogStreamMetadataStores() throws IOException {
- // log metadata store
- if (bkdlConfig.isFederatedNamespace() || conf.isFederatedNamespaceEnabled()) {
- this.metadataStore = new FederatedZKLogMetadataStore(conf, namespace, readerZKC, scheduler);
- } else {
- this.metadataStore = new ZKLogMetadataStore(conf, namespace, readerZKC, scheduler);
- }
-
- // create log stream metadata store
- this.writerStreamMetadataStore =
- new ZKLogStreamMetadataStore(
- clientId,
- conf,
- writerZKC,
- scheduler,
- statsLogger);
- this.readerStreamMetadataStore =
- new ZKLogStreamMetadataStore(
- clientId,
- conf,
- readerZKC,
- scheduler,
- statsLogger);
- }
-
- @VisibleForTesting
- public static String validateAndGetFullLedgerAllocatorPoolPath(DistributedLogConfiguration conf, URI uri) throws IOException {
- String poolPath = conf.getLedgerAllocatorPoolPath();
- LOG.info("PoolPath is {}", poolPath);
- if (null == poolPath || !poolPath.startsWith(".") || poolPath.endsWith("/")) {
- LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", poolPath);
- throw new IOException("Invalid ledger allocator pool path specified : " + poolPath);
- }
- String poolName = conf.getLedgerAllocatorPoolName();
- if (null == poolName) {
- LOG.error("No ledger allocator pool name specified when enabling ledger allocator pool.");
- throw new IOException("No ledger allocator name specified when enabling ledger allocator pool.");
- }
- String rootPath = uri.getPath() + "/" + poolPath + "/" + poolName;
- try {
- PathUtils.validatePath(rootPath);
- } catch (IllegalArgumentException iae) {
- LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", poolPath);
- throw new IOException("Invalid ledger allocator pool path specified : " + poolPath);
- }
- return rootPath;
- }
-
- private void initializeOtherResources() throws IOException {
- // Ledger allocator
- if (conf.getEnableLedgerAllocatorPool()) {
- String allocatorPoolPath = validateAndGetFullLedgerAllocatorPoolPath(conf, namespace);
- allocator = LedgerAllocatorUtils.createLedgerAllocatorPool(
- allocatorPoolPath,
- conf.getLedgerAllocatorPoolCoreSize(),
- conf,
- writerZKC,
- writerBKC,
- scheduler);
- if (null != allocator) {
- allocator.start();
- }
- LOG.info("Created ledger allocator pool under {} with size {}.", allocatorPoolPath, conf.getLedgerAllocatorPoolCoreSize());
- } else {
- allocator = null;
- }
-
- }
-
- private void checkState() throws IOException {
- if (closed.get()) {
- LOG.error("BK namespace driver {} is already closed", namespace);
- throw new AlreadyClosedException("BK namespace driver " + namespace + " is already closed");
- }
- }
-
- @Override
- public void close() throws IOException {
- if (!closed.compareAndSet(false, true)) {
- return;
- }
- doClose();
- }
-
- private void doClose() {
- if (null != accessControlManager) {
- accessControlManager.close();
- LOG.info("Access Control Manager Stopped.");
- }
-
- // Close the allocator
- if (null != allocator) {
- Utils.closeQuietly(allocator);
- LOG.info("Ledger Allocator stopped.");
- }
-
- // Shutdown log segment metadata stores
- Utils.close(writerStreamMetadataStore);
- Utils.close(readerStreamMetadataStore);
-
- writerBKC.close();
- readerBKC.close();
- writerZKC.close();
- readerZKC.close();
- // release bookkeeper resources
- channelFactory.releaseExternalResources();
- LOG.info("Release external resources used by channel factory.");
- requestTimer.stop();
- LOG.info("Stopped request timer");
- }
-
- @Override
- public URI getUri() {
- return namespace;
- }
-
- @Override
- public String getScheme() {
- return DistributedLogConstants.BACKEND_BK;
- }
-
- @Override
- public LogMetadataStore getLogMetadataStore() {
- return metadataStore;
- }
-
- @Override
- public LogStreamMetadataStore getLogStreamMetadataStore(Role role) {
- if (Role.WRITER == role) {
- return writerStreamMetadataStore;
- } else {
- return readerStreamMetadataStore;
- }
- }
-
- @Override
- public LogSegmentEntryStore getLogSegmentEntryStore(Role role) {
- if (Role.WRITER == role) {
- return getWriterEntryStore();
- } else {
- return getReaderEntryStore();
- }
- }
-
- private LogSegmentEntryStore getWriterEntryStore() {
- if (null == writerEntryStore) {
- writerEntryStore = new BKLogSegmentEntryStore(
- conf,
- dynConf,
- writerZKC,
- writerBKC,
- scheduler,
- allocator,
- statsLogger,
- failureInjector);
- }
- return writerEntryStore;
- }
-
- private LogSegmentEntryStore getReaderEntryStore() {
- if (null == readerEntryStore) {
- readerEntryStore = new BKLogSegmentEntryStore(
- conf,
- dynConf,
- writerZKC,
- readerBKC,
- scheduler,
- allocator,
- statsLogger,
- failureInjector);
- }
- return readerEntryStore;
- }
-
- @Override
- public AccessControlManager getAccessControlManager() throws IOException {
- if (null == accessControlManager) {
- String aclRootPath = getBkdlConfig().getACLRootPath();
- // Build the access control manager
- if (aclRootPath == null) {
- accessControlManager = DefaultAccessControlManager.INSTANCE;
- LOG.info("Created default access control manager for {}", namespace);
- } else {
- if (!isReservedStreamName(aclRootPath)) {
- throw new IOException("Invalid Access Control List Root Path : " + aclRootPath);
- }
- String zkRootPath = namespace.getPath() + "/" + aclRootPath;
- LOG.info("Creating zk based access control manager @ {} for {}",
- zkRootPath, namespace);
- accessControlManager = new ZKAccessControlManager(conf, readerZKC,
- zkRootPath, scheduler);
- LOG.info("Created zk based access control manager @ {} for {}",
- zkRootPath, namespace);
- }
- }
- return accessControlManager;
- }
-
- @Override
- public SubscriptionsStore getSubscriptionsStore(String streamName) {
- return new ZKSubscriptionsStore(
- writerZKC,
- LogMetadataForReader.getSubscribersPath(namespace, streamName, conf.getUnpartitionedStreamName()));
- }
-
- //
- // Legacy Intefaces
- //
-
- @Override
- public MetadataAccessor getMetadataAccessor(String streamName)
- throws InvalidStreamNameException, IOException {
- if (getBkdlConfig().isFederatedNamespace()) {
- throw new UnsupportedOperationException();
- }
- checkState();
- validateName(streamName);
- return new ZKMetadataAccessor(
- streamName,
- conf,
- namespace,
- sharedWriterZKCBuilder,
- sharedReaderZKCBuilder,
- statsLogger);
- }
-
- public Map<String, byte[]> enumerateLogsWithMetadataInNamespace()
- throws IOException, IllegalArgumentException {
- String namespaceRootPath = namespace.getPath();
- HashMap<String, byte[]> result = new HashMap<String, byte[]>();
- ZooKeeperClient zkc = writerZKC;
- try {
- ZooKeeper zk = Utils.sync(zkc, namespaceRootPath);
- Stat currentStat = zk.exists(namespaceRootPath, false);
- if (currentStat == null) {
- return result;
- }
- List<String> children = zk.getChildren(namespaceRootPath, false);
- for(String child: children) {
- if (isReservedStreamName(child)) {
- continue;
- }
- String zkPath = String.format("%s/%s", namespaceRootPath, child);
- currentStat = zk.exists(zkPath, false);
- if (currentStat == null) {
- result.put(child, new byte[0]);
- } else {
- result.put(child, zk.getData(zkPath, false, currentStat));
- }
- }
- } catch (InterruptedException ie) {
- LOG.error("Interrupted while deleting " + namespaceRootPath, ie);
- throw new IOException("Interrupted while reading " + namespaceRootPath, ie);
- } catch (KeeperException ke) {
- LOG.error("Error reading" + namespaceRootPath + "entry in zookeeper", ke);
- throw new IOException("Error reading" + namespaceRootPath + "entry in zookeeper", ke);
- }
- return result;
- }
-
- //
- // Zk & Bk Utils
- //
-
- public static ZooKeeperClientBuilder createZKClientBuilder(String zkcName,
- DistributedLogConfiguration conf,
- String zkServers,
- StatsLogger statsLogger) {
- RetryPolicy retryPolicy = null;
- if (conf.getZKNumRetries() > 0) {
- retryPolicy = new BoundExponentialBackoffRetryPolicy(
- conf.getZKRetryBackoffStartMillis(),
- conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries());
- }
- ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder()
- .name(zkcName)
- .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
- .retryThreadCount(conf.getZKClientNumberRetryThreads())
- .requestRateLimit(conf.getZKRequestRateLimit())
- .zkServers(zkServers)
- .retryPolicy(retryPolicy)
- .statsLogger(statsLogger)
- .zkAclId(conf.getZkAclId());
- LOG.info("Created shared zooKeeper client builder {}: zkServers = {}, numRetries = {}, sessionTimeout = {}, retryBackoff = {},"
- + " maxRetryBackoff = {}, zkAclId = {}.", new Object[] { zkcName, zkServers, conf.getZKNumRetries(),
- conf.getZKSessionTimeoutMilliseconds(), conf.getZKRetryBackoffStartMillis(),
- conf.getZKRetryBackoffMaxMillis(), conf.getZkAclId() });
- return builder;
- }
-
- private BookKeeperClientBuilder createBKCBuilder(String bkcName,
- DistributedLogConfiguration conf,
- String zkServers,
- String ledgersPath,
- ClientSocketChannelFactory channelFactory,
- HashedWheelTimer requestTimer,
- Optional<FeatureProvider> featureProviderOptional,
- StatsLogger statsLogger) {
- BookKeeperClientBuilder builder = BookKeeperClientBuilder.newBuilder()
- .name(bkcName)
- .dlConfig(conf)
- .zkServers(zkServers)
- .ledgersPath(ledgersPath)
- .channelFactory(channelFactory)
- .requestTimer(requestTimer)
- .featureProvider(featureProviderOptional)
- .statsLogger(statsLogger);
- LOG.info("Created shared client builder {} : zkServers = {}, ledgersPath = {}, numIOThreads = {}",
- new Object[] { bkcName, zkServers, ledgersPath, conf.getBKClientNumberIOThreads() });
- return builder;
- }
-
- //
- // Test Methods
- //
-
- @VisibleForTesting
- public ZooKeeperClient getWriterZKC() {
- return writerZKC;
- }
-
- @VisibleForTesting
- public BookKeeperClient getReaderBKC() {
- return readerBKC;
- }
-
- @VisibleForTesting
- public AsyncFailureInjector getFailureInjector() {
- return this.failureInjector;
- }
-
- @VisibleForTesting
- public LogStreamMetadataStore getWriterStreamMetadataStore() {
- return writerStreamMetadataStore;
- }
-
- @VisibleForTesting
- public LedgerAllocator getLedgerAllocator() {
- return allocator;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java
deleted file mode 100644
index 50b1405..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.callback.NamespaceListener;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.metadata.LogMetadataStore;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-import java.net.URI;
-import java.util.Iterator;
-import java.util.List;
-
-import static com.twitter.distributedlog.util.DLUtils.*;
-
-/**
- * ZooKeeper based log metadata store
- */
-public class ZKLogMetadataStore implements LogMetadataStore {
-
- final URI namespace;
- final Optional<URI> nsOptional;
- final ZooKeeperClient zkc;
- final ZKNamespaceWatcher nsWatcher;
-
- public ZKLogMetadataStore(
- DistributedLogConfiguration conf,
- URI namespace,
- ZooKeeperClient zkc,
- OrderedScheduler scheduler) {
- this.namespace = namespace;
- this.nsOptional = Optional.of(this.namespace);
- this.zkc = zkc;
- this.nsWatcher = new ZKNamespaceWatcher(conf, namespace, zkc, scheduler);
- }
-
- @Override
- public Future<URI> createLog(String logName) {
- return Future.value(namespace);
- }
-
- @Override
- public Future<Optional<URI>> getLogLocation(String logName) {
- return Future.value(nsOptional);
- }
-
- @Override
- public Future<Iterator<String>> getLogs() {
- final Promise<Iterator<String>> promise = new Promise<Iterator<String>>();
- final String nsRootPath = namespace.getPath();
- try {
- final ZooKeeper zk = zkc.get();
- zk.sync(nsRootPath, new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int syncRc, String syncPath, Object ctx) {
- if (KeeperException.Code.OK.intValue() == syncRc) {
- zk.getChildren(nsRootPath, false, new AsyncCallback.Children2Callback() {
- @Override
- public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
- if (KeeperException.Code.OK.intValue() == rc) {
- List<String> results = Lists.newArrayListWithExpectedSize(children.size());
- for (String child : children) {
- if (!isReservedStreamName(child)) {
- results.add(child);
- }
- }
- promise.setValue(results.iterator());
- } else if (KeeperException.Code.NONODE.intValue() == rc) {
- List<String> streams = Lists.newLinkedList();
- promise.setValue(streams.iterator());
- } else {
- promise.setException(new ZKException("Error reading namespace " + nsRootPath,
- KeeperException.Code.get(rc)));
- }
- }
- }, null);
- } else if (KeeperException.Code.NONODE.intValue() == syncRc) {
- List<String> streams = Lists.newLinkedList();
- promise.setValue(streams.iterator());
- } else {
- promise.setException(new ZKException("Error reading namespace " + nsRootPath,
- KeeperException.Code.get(syncRc)));
- }
- }
- }, null);
- zkc.get();
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
- } catch (InterruptedException e) {
- promise.setException(e);
- }
- return promise;
- }
-
- @Override
- public void registerNamespaceListener(NamespaceListener listener) {
- this.nsWatcher.registerListener(listener);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentFilters.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentFilters.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentFilters.java
deleted file mode 100644
index e55b2f2..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentFilters.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl;
-
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.logsegment.LogSegmentFilter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Filters based on current zookeeper log segments.
- */
-public class ZKLogSegmentFilters {
-
- static final Logger LOG = LoggerFactory.getLogger(ZKLogSegmentFilters.class);
-
- /**
- * Write handler filter should return all inprogress log segments and the last completed log segment.
- * Because sequence id & ledger sequence number assignment rely on previous log segments.
- */
- public static final LogSegmentFilter WRITE_HANDLE_FILTER = new LogSegmentFilter() {
- @Override
- public Collection<String> filter(Collection<String> fullList) {
- List<String> result = new ArrayList<String>(fullList.size());
- String lastCompletedLogSegmentName = null;
- long lastLogSegmentSequenceNumber = -1L;
- for (String s : fullList) {
- if (s.startsWith(DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX)) {
- result.add(s);
- } else if (s.startsWith(DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX)) {
- String[] parts = s.split("_");
- try {
- if (2 == parts.length) {
- // name: logrecs_<logsegment_sequence_number>
- long logSegmentSequenceNumber = Long.parseLong(parts[1]);
- if (logSegmentSequenceNumber > lastLogSegmentSequenceNumber) {
- lastLogSegmentSequenceNumber = logSegmentSequenceNumber;
- lastCompletedLogSegmentName = s;
- }
- } else if (6 == parts.length) {
- // name: logrecs_<start_tx_id>_<end_tx_id>_<logsegment_sequence_number>_<ledger_id>_<region_id>
- long logSegmentSequenceNumber = Long.parseLong(parts[3]);
- if (logSegmentSequenceNumber > lastLogSegmentSequenceNumber) {
- lastLogSegmentSequenceNumber = logSegmentSequenceNumber;
- lastCompletedLogSegmentName = s;
- }
- } else {
- // name: logrecs_<start_tx_id>_<end_tx_id> or any unknown names
- // we don't know the ledger sequence from the name, so add it to the list
- result.add(s);
- }
- } catch (NumberFormatException nfe) {
- LOG.warn("Unexpected sequence number in log segment {} :", s, nfe);
- result.add(s);
- }
- } else {
- LOG.error("Unknown log segment name : {}", s);
- }
- }
- if (null != lastCompletedLogSegmentName) {
- result.add(lastCompletedLogSegmentName);
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Filtered log segments {} from {}.", result, fullList);
- }
- return result;
- }
- };
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
deleted file mode 100644
index 2076dd8..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ /dev/null
@@ -1,503 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl;
-
-import com.google.common.collect.ImmutableList;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.callback.LogSegmentNamesListener;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.metadata.LogMetadata;
-import com.twitter.distributedlog.metadata.LogMetadataForWriter;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
-import com.twitter.distributedlog.util.DLUtils;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.distributedlog.util.Transaction.OpListener;
-import com.twitter.distributedlog.zk.DefaultZKOp;
-import com.twitter.distributedlog.zk.ZKOp;
-import com.twitter.distributedlog.zk.ZKTransaction;
-import com.twitter.distributedlog.zk.ZKVersionedSetOp;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.AsyncCallback.Children2Callback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * ZooKeeper based log segment metadata store.
- */
-public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watcher, Children2Callback {
-
- private static final Logger logger = LoggerFactory.getLogger(ZKLogSegmentMetadataStore.class);
-
- private static final List<String> EMPTY_LIST = ImmutableList.of();
-
- private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<Versioned<List<String>>> {
-
- private final String logSegmentsPath;
- private final ZKLogSegmentMetadataStore store;
- private int currentZKBackOffMs;
-
- ReadLogSegmentsTask(String logSegmentsPath,
- ZKLogSegmentMetadataStore metadataStore) {
- this.logSegmentsPath = logSegmentsPath;
- this.store = metadataStore;
- this.currentZKBackOffMs = store.minZKBackoffMs;
- }
-
- @Override
- public void onSuccess(final Versioned<List<String>> segments) {
- // reset the back off after a successful operation
- currentZKBackOffMs = store.minZKBackoffMs;
- store.notifyLogSegmentsUpdated(
- logSegmentsPath,
- store.listeners.get(logSegmentsPath),
- segments);
- }
-
- @Override
- public void onFailure(Throwable cause) {
- int backoffMs;
- if (cause instanceof LogNotFoundException) {
- // the log segment has been deleted, remove all the registered listeners
- store.notifyLogStreamDeleted(logSegmentsPath,
- store.listeners.remove(logSegmentsPath));
- return;
- } else {
- backoffMs = currentZKBackOffMs;
- currentZKBackOffMs = Math.min(2 * currentZKBackOffMs, store.maxZKBackoffMs);
- }
- store.scheduleTask(logSegmentsPath, this, backoffMs);
- }
-
- @Override
- public void run() {
- if (null != store.listeners.get(logSegmentsPath)) {
- store.zkGetLogSegmentNames(logSegmentsPath, store).addEventListener(this);
- } else {
- logger.debug("Log segments listener for {} has been removed.", logSegmentsPath);
- }
- }
- }
-
- /**
- * A log segment names listener that keeps tracking the version of list of log segments that it has been notified.
- * It only notify the newer log segments.
- */
- static class VersionedLogSegmentNamesListener {
-
- private final LogSegmentNamesListener listener;
- private Versioned<List<String>> lastNotifiedLogSegments;
-
- VersionedLogSegmentNamesListener(LogSegmentNamesListener listener) {
- this.listener = listener;
- this.lastNotifiedLogSegments = new Versioned<List<String>>(EMPTY_LIST, Version.NEW);
- }
-
- synchronized void onSegmentsUpdated(Versioned<List<String>> logSegments) {
- if (lastNotifiedLogSegments.getVersion() == Version.NEW ||
- lastNotifiedLogSegments.getVersion().compare(logSegments.getVersion()) == Version.Occurred.BEFORE) {
- lastNotifiedLogSegments = logSegments;
- listener.onSegmentsUpdated(logSegments);
- }
- }
-
- @Override
- public int hashCode() {
- return listener.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof VersionedLogSegmentNamesListener)) {
- return false;
- }
- VersionedLogSegmentNamesListener other = (VersionedLogSegmentNamesListener) obj;
- return listener.equals(other.listener);
- }
-
- @Override
- public String toString() {
- return listener.toString();
- }
- }
-
- final DistributedLogConfiguration conf;
- // settings
- final int minZKBackoffMs;
- final int maxZKBackoffMs;
- final boolean skipMinVersionCheck;
-
- final ZooKeeperClient zkc;
- // log segment listeners
- final ConcurrentMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>> listeners;
- // scheduler
- final OrderedScheduler scheduler;
- final ReentrantReadWriteLock closeLock;
- boolean closed = false;
-
- public ZKLogSegmentMetadataStore(DistributedLogConfiguration conf,
- ZooKeeperClient zkc,
- OrderedScheduler scheduler) {
- this.conf = conf;
- this.zkc = zkc;
- this.listeners =
- new ConcurrentHashMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>>();
- this.scheduler = scheduler;
- this.closeLock = new ReentrantReadWriteLock();
- // settings
- this.minZKBackoffMs = conf.getZKRetryBackoffStartMillis();
- this.maxZKBackoffMs = conf.getZKRetryBackoffMaxMillis();
- this.skipMinVersionCheck = conf.getDLLedgerMetadataSkipMinVersionCheck();
- }
-
- protected void scheduleTask(Object key, Runnable r, long delayMs) {
- closeLock.readLock().lock();
- try {
- if (closed) {
- return;
- }
- scheduler.schedule(key, r, delayMs, TimeUnit.MILLISECONDS);
- } finally {
- closeLock.readLock().unlock();
- }
- }
-
- protected void submitTask(Object key, Runnable r) {
- closeLock.readLock().lock();
- try {
- if (closed) {
- return;
- }
- scheduler.submit(key, r);
- } finally {
- closeLock.readLock().unlock();
- }
- }
-
- // max sequence number and max transaction id
-
- @Override
- public void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn,
- LogMetadata logMetadata,
- Versioned<Long> lssn,
- Transaction.OpListener<Version> listener) {
- Version version = lssn.getVersion();
- assert(version instanceof ZkVersion);
- ZkVersion zkVersion = (ZkVersion) version;
- byte[] data = DLUtils.serializeLogSegmentSequenceNumber(lssn.getValue());
- Op setDataOp = Op.setData(logMetadata.getLogSegmentsPath(), data, zkVersion.getZnodeVersion());
- ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener);
- txn.addOp(zkOp);
- }
-
- @Override
- public void storeMaxTxnId(Transaction<Object> txn,
- LogMetadataForWriter logMetadata,
- Versioned<Long> transactionId,
- Transaction.OpListener<Version> listener) {
- Version version = transactionId.getVersion();
- assert(version instanceof ZkVersion);
- ZkVersion zkVersion = (ZkVersion) version;
- byte[] data = DLUtils.serializeTransactionId(transactionId.getValue());
- Op setDataOp = Op.setData(logMetadata.getMaxTxIdPath(), data, zkVersion.getZnodeVersion());
- ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener);
- txn.addOp(zkOp);
- }
-
- // updates
-
- @Override
- public Transaction<Object> transaction() {
- return new ZKTransaction(zkc);
- }
-
- @Override
- public void createLogSegment(Transaction<Object> txn,
- LogSegmentMetadata segment,
- OpListener<Void> listener) {
- byte[] finalisedData = segment.getFinalisedData().getBytes(UTF_8);
- Op createOp = Op.create(
- segment.getZkPath(),
- finalisedData,
- zkc.getDefaultACL(),
- CreateMode.PERSISTENT);
- txn.addOp(DefaultZKOp.of(createOp, listener));
- }
-
- @Override
- public void deleteLogSegment(Transaction<Object> txn,
- final LogSegmentMetadata segment,
- final OpListener<Void> listener) {
- Op deleteOp = Op.delete(
- segment.getZkPath(),
- -1);
- logger.info("Delete segment : {}", segment);
- txn.addOp(DefaultZKOp.of(deleteOp, new OpListener<Void>() {
- @Override
- public void onCommit(Void r) {
- if (null != listener) {
- listener.onCommit(r);
- }
- }
-
- @Override
- public void onAbort(Throwable t) {
- logger.info("Aborted transaction on deleting segment {}", segment);
- KeeperException.Code kc;
- if (t instanceof KeeperException) {
- kc = ((KeeperException) t).code();
- } else if (t instanceof ZKException) {
- kc = ((ZKException) t).getKeeperExceptionCode();
- } else {
- abortListener(t);
- return;
- }
- if (KeeperException.Code.NONODE == kc) {
- abortListener(new LogSegmentNotFoundException(segment.getZkPath()));
- return;
- }
- abortListener(t);
- }
-
- private void abortListener(Throwable t) {
- if (null != listener) {
- listener.onAbort(t);
- }
- }
- }));
- }
-
- @Override
- public void updateLogSegment(Transaction<Object> txn, LogSegmentMetadata segment) {
- byte[] finalisedData = segment.getFinalisedData().getBytes(UTF_8);
- Op updateOp = Op.setData(segment.getZkPath(), finalisedData, -1);
- txn.addOp(DefaultZKOp.of(updateOp, null));
- }
-
- // reads
-
- /**
- * Process the watched events for registered listeners
- */
- @Override
- public void process(WatchedEvent event) {
- if (Event.EventType.None == event.getType()
- && Event.KeeperState.Expired == event.getState()) {
- Set<String> keySet = new HashSet<String>(listeners.keySet());
- for (String logSegmentsPath : keySet) {
- scheduleTask(logSegmentsPath, new ReadLogSegmentsTask(logSegmentsPath, this), 0L);
- }
- return;
- }
- String path = event.getPath();
- if (null == path) {
- return;
- }
- switch (event.getType()) {
- case NodeDeleted:
- notifyLogStreamDeleted(path, listeners.remove(path));
- break;
- case NodeChildrenChanged:
- new ReadLogSegmentsTask(path, this).run();
- break;
- default:
- break;
- }
- }
-
- @Override
- public Future<LogSegmentMetadata> getLogSegment(String logSegmentPath) {
- return LogSegmentMetadata.read(zkc, logSegmentPath, skipMinVersionCheck);
- }
-
- Future<Versioned<List<String>>> zkGetLogSegmentNames(String logSegmentsPath, Watcher watcher) {
- Promise<Versioned<List<String>>> result = new Promise<Versioned<List<String>>>();
- try {
- zkc.get().getChildren(logSegmentsPath, watcher, this, result);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- result.setException(FutureUtils.zkException(e, logSegmentsPath));
- } catch (InterruptedException e) {
- result.setException(FutureUtils.zkException(e, logSegmentsPath));
- }
- return result;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
- Promise<Versioned<List<String>>> result = ((Promise<Versioned<List<String>>>) ctx);
- if (KeeperException.Code.OK.intValue() == rc) {
- /** cversion: the number of changes to the children of this znode **/
- ZkVersion zkVersion = new ZkVersion(stat.getCversion());
- result.setValue(new Versioned(children, zkVersion));
- } else if (KeeperException.Code.NONODE.intValue() == rc) {
- result.setException(new LogNotFoundException("Log " + path + " not found"));
- } else {
- result.setException(new ZKException("Failed to get log segments from " + path,
- KeeperException.Code.get(rc)));
- }
- }
-
- @Override
- public Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath,
- LogSegmentNamesListener listener) {
- Watcher zkWatcher;
- if (null == listener) {
- zkWatcher = null;
- } else {
- closeLock.readLock().lock();
- try {
- if (closed) {
- zkWatcher = null;
- } else {
- Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
- listeners.get(logSegmentsPath);
- if (null == listenerSet) {
- Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> newListenerSet =
- new HashMap<LogSegmentNamesListener, VersionedLogSegmentNamesListener>();
- Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> oldListenerSet =
- listeners.putIfAbsent(logSegmentsPath, newListenerSet);
- if (null != oldListenerSet) {
- listenerSet = oldListenerSet;
- } else {
- listenerSet = newListenerSet;
- }
- }
- synchronized (listenerSet) {
- listenerSet.put(listener, new VersionedLogSegmentNamesListener(listener));
- if (!listeners.containsKey(logSegmentsPath)) {
- // listener set has been removed, add it back
- if (null != listeners.putIfAbsent(logSegmentsPath, listenerSet)) {
- logger.debug("Listener set is already found for log segments path {}", logSegmentsPath);
- }
- }
- }
- zkWatcher = ZKLogSegmentMetadataStore.this;
- }
- } finally {
- closeLock.readLock().unlock();
- }
- }
- Future<Versioned<List<String>>> getLogSegmentNamesResult = zkGetLogSegmentNames(logSegmentsPath, zkWatcher);
- if (null != listener) {
- getLogSegmentNamesResult.addEventListener(new ReadLogSegmentsTask(logSegmentsPath, this));
- }
- return zkGetLogSegmentNames(logSegmentsPath, zkWatcher);
- }
-
- @Override
- public void unregisterLogSegmentListener(String logSegmentsPath,
- LogSegmentNamesListener listener) {
- closeLock.readLock().lock();
- try {
- if (closed) {
- return;
- }
- Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
- listeners.get(logSegmentsPath);
- if (null == listenerSet) {
- return;
- }
- synchronized (listenerSet) {
- listenerSet.remove(listener);
- if (listenerSet.isEmpty()) {
- listeners.remove(logSegmentsPath, listenerSet);
- }
- }
- } finally {
- closeLock.readLock().unlock();
- }
- }
-
- @Override
- public void close() throws IOException {
- closeLock.writeLock().lock();
- try {
- if (closed) {
- return;
- }
- closed = true;
- } finally {
- closeLock.writeLock().unlock();
- }
- }
-
- // Notifications
-
- void notifyLogStreamDeleted(String logSegmentsPath,
- final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listeners) {
- if (null == listeners) {
- return;
- }
- this.submitTask(logSegmentsPath, new Runnable() {
- @Override
- public void run() {
- for (LogSegmentNamesListener listener : listeners.keySet()) {
- listener.onLogStreamDeleted();
- }
- }
- });
-
- }
-
- void notifyLogSegmentsUpdated(String logSegmentsPath,
- final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listeners,
- final Versioned<List<String>> segments) {
- if (null == listeners) {
- return;
- }
- this.submitTask(logSegmentsPath, new Runnable() {
- @Override
- public void run() {
- for (VersionedLogSegmentNamesListener listener : listeners.values()) {
- listener.onSegmentsUpdated(segments);
- }
- }
- });
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java
deleted file mode 100644
index eeda804..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl;
-
-import java.io.IOException;
-import java.net.URI;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.MetadataAccessor;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.ZooKeeperClientBuilder;
-import com.twitter.distributedlog.exceptions.AlreadyClosedException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.impl.metadata.BKDLConfig;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.RetryPolicy;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.twitter.distributedlog.impl.BKNamespaceDriver.getZKServersFromDLUri;
-
-public class ZKMetadataAccessor implements MetadataAccessor {
- static final Logger LOG = LoggerFactory.getLogger(ZKMetadataAccessor.class);
- protected final String name;
- protected Promise<Void> closePromise;
- protected final URI uri;
- // zookeeper clients
- // NOTE: The actual zookeeper client is initialized lazily when it is referenced by
- // {@link com.twitter.distributedlog.ZooKeeperClient#get()}. So it is safe to
- // keep builders and their client wrappers here, as they will be used when
- // instantiating readers or writers.
- protected final ZooKeeperClientBuilder writerZKCBuilder;
- protected final ZooKeeperClient writerZKC;
- protected final boolean ownWriterZKC;
- protected final ZooKeeperClientBuilder readerZKCBuilder;
- protected final ZooKeeperClient readerZKC;
- protected final boolean ownReaderZKC;
-
- ZKMetadataAccessor(String name,
- DistributedLogConfiguration conf,
- URI uri,
- ZooKeeperClientBuilder writerZKCBuilder,
- ZooKeeperClientBuilder readerZKCBuilder,
- StatsLogger statsLogger) {
- this.name = name;
- this.uri = uri;
-
- if (null == writerZKCBuilder) {
- RetryPolicy retryPolicy = null;
- if (conf.getZKNumRetries() > 0) {
- retryPolicy = new BoundExponentialBackoffRetryPolicy(
- conf.getZKRetryBackoffStartMillis(),
- conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries());
- }
- this.writerZKCBuilder = ZooKeeperClientBuilder.newBuilder()
- .name(String.format("dlzk:%s:dlm_writer_shared", name))
- .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
- .retryThreadCount(conf.getZKClientNumberRetryThreads())
- .requestRateLimit(conf.getZKRequestRateLimit())
- .zkAclId(conf.getZkAclId())
- .uri(uri)
- .retryPolicy(retryPolicy)
- .statsLogger(statsLogger.scope("dlzk_dlm_writer_shared"));
- this.ownWriterZKC = true;
- } else {
- this.writerZKCBuilder = writerZKCBuilder;
- this.ownWriterZKC = false;
- }
- this.writerZKC = this.writerZKCBuilder.build();
-
- if (null == readerZKCBuilder) {
- String zkServersForWriter = getZKServersFromDLUri(uri);
- String zkServersForReader;
- try {
- BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(this.writerZKC, uri);
- zkServersForReader = bkdlConfig.getDlZkServersForReader();
- } catch (IOException e) {
- LOG.warn("Error on resolving dl metadata bindings for {} : ", uri, e);
- zkServersForReader = zkServersForWriter;
- }
- if (zkServersForReader.equals(zkServersForWriter)) {
- LOG.info("Used same zookeeper servers '{}' for both writers and readers for {}.",
- zkServersForWriter, name);
- this.readerZKCBuilder = this.writerZKCBuilder;
- this.ownReaderZKC = false;
- } else {
- RetryPolicy retryPolicy = null;
- if (conf.getZKNumRetries() > 0) {
- retryPolicy = new BoundExponentialBackoffRetryPolicy(
- conf.getZKRetryBackoffStartMillis(),
- conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries());
- }
- this.readerZKCBuilder = ZooKeeperClientBuilder.newBuilder()
- .name(String.format("dlzk:%s:dlm_reader_shared", name))
- .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
- .retryThreadCount(conf.getZKClientNumberRetryThreads())
- .requestRateLimit(conf.getZKRequestRateLimit())
- .zkServers(zkServersForReader)
- .retryPolicy(retryPolicy)
- .zkAclId(conf.getZkAclId())
- .statsLogger(statsLogger.scope("dlzk_dlm_reader_shared"));
- this.ownReaderZKC = true;
- }
- } else {
- this.readerZKCBuilder = readerZKCBuilder;
- this.ownReaderZKC = false;
- }
- this.readerZKC = this.readerZKCBuilder.build();
- }
-
- /**
- * Get the name of the stream managed by this log manager
- *
- * @return streamName
- */
- @Override
- public String getStreamName() {
- return name;
- }
-
- /**
- * Creates or update the metadata stored at the node associated with the
- * name and URI
- * @param metadata opaque metadata to be stored for the node
- * @throws IOException
- */
- @Override
- public void createOrUpdateMetadata(byte[] metadata) throws IOException {
- checkClosedOrInError("createOrUpdateMetadata");
-
- String zkPath = getZKPath();
- LOG.debug("Setting application specific metadata on {}", zkPath);
- try {
- Stat currentStat = writerZKC.get().exists(zkPath, false);
- if (currentStat == null) {
- if (metadata.length > 0) {
- Utils.zkCreateFullPathOptimistic(writerZKC,
- zkPath,
- metadata,
- writerZKC.getDefaultACL(),
- CreateMode.PERSISTENT);
- }
- } else {
- writerZKC.get().setData(zkPath, metadata, currentStat.getVersion());
- }
- } catch (InterruptedException ie) {
- throw new DLInterruptedException("Interrupted on creating or updating container metadata", ie);
- } catch (Exception exc) {
- throw new IOException("Exception creating or updating container metadata", exc);
- }
- }
-
- /**
- * Delete the metadata stored at the associated node. This only deletes the metadata
- * and not the node itself
- * @throws IOException
- */
- @Override
- public void deleteMetadata() throws IOException {
- checkClosedOrInError("createOrUpdateMetadata");
- createOrUpdateMetadata(null);
- }
-
- /**
- * Retrieve the metadata stored at the node
- * @return byte array containing the metadata
- * @throws IOException
- */
- @Override
- public byte[] getMetadata() throws IOException {
- checkClosedOrInError("createOrUpdateMetadata");
- String zkPath = getZKPath();
- LOG.debug("Getting application specific metadata from {}", zkPath);
- try {
- Stat currentStat = readerZKC.get().exists(zkPath, false);
- if (currentStat == null) {
- return null;
- } else {
- return readerZKC.get().getData(zkPath, false, currentStat);
- }
- } catch (InterruptedException ie) {
- throw new DLInterruptedException("Error reading the max tx id from zk", ie);
- } catch (Exception e) {
- throw new IOException("Error reading the max tx id from zk", e);
- }
- }
-
- /**
- * Close the metadata accessor, freeing any resources it may hold.
- * @return future represents the close result.
- */
- @Override
- public Future<Void> asyncClose() {
- Promise<Void> closeFuture;
- synchronized (this) {
- if (null != closePromise) {
- return closePromise;
- }
- closeFuture = closePromise = new Promise<Void>();
- }
- // NOTE: ownWriterZKC and ownReaderZKC are mostly used by tests
- // the managers created by the namespace - whose zkc will be closed by namespace
- try {
- if (ownWriterZKC) {
- writerZKC.close();
- }
- if (ownReaderZKC) {
- readerZKC.close();
- }
- } catch (Exception e) {
- LOG.warn("Exception while closing distributed log manager", e);
- }
- FutureUtils.setValue(closeFuture, null);
- return closeFuture;
- }
-
- @Override
- public void close() throws IOException {
- FutureUtils.result(asyncClose());
- }
-
- public synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException {
- if (null != closePromise) {
- throw new AlreadyClosedException("Executing " + operation + " on already closed ZKMetadataAccessor");
- }
- }
-
- protected String getZKPath() {
- return String.format("%s/%s", uri.getPath(), name);
- }
-
- @VisibleForTesting
- protected ZooKeeperClient getReaderZKC() {
- return readerZKC;
- }
-
- @VisibleForTesting
- protected ZooKeeperClient getWriterZKC() {
- return writerZKC;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java
deleted file mode 100644
index 06bc8fb..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.callback.NamespaceListener;
-import com.twitter.distributedlog.namespace.NamespaceWatcher;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static com.twitter.distributedlog.util.DLUtils.*;
-
-/**
- * Watcher on watching a given namespace
- */
-public class ZKNamespaceWatcher extends NamespaceWatcher
- implements Runnable, Watcher, AsyncCallback.Children2Callback {
-
- static final Logger logger = LoggerFactory.getLogger(ZKNamespaceWatcher.class);
-
- private final DistributedLogConfiguration conf;
- private final URI uri;
- private final ZooKeeperClient zkc;
- private final OrderedScheduler scheduler;
- private final AtomicBoolean namespaceWatcherSet = new AtomicBoolean(false);
-
- public ZKNamespaceWatcher(DistributedLogConfiguration conf,
- URI uri,
- ZooKeeperClient zkc,
- OrderedScheduler scheduler) {
- this.conf = conf;
- this.uri = uri;
- this.zkc = zkc;
- this.scheduler = scheduler;
- }
-
- private void scheduleTask(Runnable r, long ms) {
- try {
- scheduler.schedule(r, ms, TimeUnit.MILLISECONDS);
- } catch (RejectedExecutionException ree) {
- logger.error("Task {} scheduled in {} ms is rejected : ", new Object[]{r, ms, ree});
- }
- }
-
- @Override
- public void run() {
- try {
- doWatchNamespaceChanges();
- } catch (Exception e) {
- logger.error("Encountered unknown exception on watching namespace {} ", uri, e);
- }
- }
-
- public void watchNamespaceChanges() {
- if (!namespaceWatcherSet.compareAndSet(false, true)) {
- return;
- }
- doWatchNamespaceChanges();
- }
-
- private void doWatchNamespaceChanges() {
- try {
- zkc.get().getChildren(uri.getPath(), this, this, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- scheduleTask(this, conf.getZKSessionTimeoutMilliseconds());
- } catch (InterruptedException e) {
- logger.warn("Interrupted on watching namespace changes for {} : ", uri, e);
- scheduleTask(this, conf.getZKSessionTimeoutMilliseconds());
- }
- }
-
- @Override
- public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
- if (KeeperException.Code.OK.intValue() == rc) {
- logger.info("Received updated logs under {} : {}", uri, children);
- List<String> result = new ArrayList<String>(children.size());
- for (String s : children) {
- if (isReservedStreamName(s)) {
- continue;
- }
- result.add(s);
- }
- for (NamespaceListener listener : listeners) {
- listener.onStreamsChanged(result.iterator());
- }
- } else {
- scheduleTask(this, conf.getZKSessionTimeoutMilliseconds());
- }
- }
-
- @Override
- public void process(WatchedEvent event) {
- if (event.getType() == Event.EventType.None) {
- if (event.getState() == Event.KeeperState.Expired) {
- scheduleTask(this, conf.getZKSessionTimeoutMilliseconds());
- }
- return;
- }
- if (event.getType() == Event.EventType.NodeChildrenChanged) {
- // watch namespace changes again.
- doWatchNamespaceChanges();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java
deleted file mode 100644
index 8126723..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.acl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.thrift.AccessControlEntry;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.apache.thrift.transport.TMemoryBuffer;
-import org.apache.thrift.transport.TMemoryInputTransport;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-public class ZKAccessControl {
-
- private static final int BUFFER_SIZE = 4096;
-
- public static final AccessControlEntry DEFAULT_ACCESS_CONTROL_ENTRY = new AccessControlEntry();
-
- public static class CorruptedAccessControlException extends IOException {
-
- private static final long serialVersionUID = 5391285182476211603L;
-
- public CorruptedAccessControlException(String zkPath, Throwable t) {
- super("Access Control @ " + zkPath + " is corrupted.", t);
- }
- }
-
- protected final AccessControlEntry accessControlEntry;
- protected final String zkPath;
- private int zkVersion;
-
- public ZKAccessControl(AccessControlEntry ace, String zkPath) {
- this(ace, zkPath, -1);
- }
-
- private ZKAccessControl(AccessControlEntry ace, String zkPath, int zkVersion) {
- this.accessControlEntry = ace;
- this.zkPath = zkPath;
- this.zkVersion = zkVersion;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(zkPath, accessControlEntry);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof ZKAccessControl)) {
- return false;
- }
- ZKAccessControl other = (ZKAccessControl) obj;
- return Objects.equal(zkPath, other.zkPath) &&
- Objects.equal(accessControlEntry, other.accessControlEntry);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("entry(path=").append(zkPath).append(", acl=")
- .append(accessControlEntry).append(")");
- return sb.toString();
- }
-
- @VisibleForTesting
- public String getZKPath() {
- return zkPath;
- }
-
- @VisibleForTesting
- public AccessControlEntry getAccessControlEntry() {
- return accessControlEntry;
- }
-
- public Future<ZKAccessControl> create(ZooKeeperClient zkc) {
- final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
- try {
- zkc.get().create(zkPath, serialize(accessControlEntry), zkc.getDefaultACL(), CreateMode.PERSISTENT,
- new AsyncCallback.StringCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, String name) {
- if (KeeperException.Code.OK.intValue() == rc) {
- ZKAccessControl.this.zkVersion = 0;
- promise.setValue(ZKAccessControl.this);
- } else {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
- }
- }
- }, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
- } catch (InterruptedException e) {
- promise.setException(e);
- } catch (IOException e) {
- promise.setException(e);
- }
- return promise;
- }
-
- public Future<ZKAccessControl> update(ZooKeeperClient zkc) {
- final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
- try {
- zkc.get().setData(zkPath, serialize(accessControlEntry), zkVersion, new AsyncCallback.StatCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- if (KeeperException.Code.OK.intValue() == rc) {
- ZKAccessControl.this.zkVersion = stat.getVersion();
- promise.setValue(ZKAccessControl.this);
- } else {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
- }
- }
- }, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
- } catch (InterruptedException e) {
- promise.setException(e);
- } catch (IOException e) {
- promise.setException(e);
- }
- return promise;
- }
-
- public static Future<ZKAccessControl> read(final ZooKeeperClient zkc, final String zkPath, Watcher watcher) {
- final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
-
- try {
- zkc.get().getData(zkPath, watcher, new AsyncCallback.DataCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
- if (KeeperException.Code.OK.intValue() == rc) {
- try {
- AccessControlEntry ace = deserialize(zkPath, data);
- promise.setValue(new ZKAccessControl(ace, zkPath, stat.getVersion()));
- } catch (IOException ioe) {
- promise.setException(ioe);
- }
- } else {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
- }
- }
- }, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
- } catch (InterruptedException e) {
- promise.setException(e);
- }
- return promise;
- }
-
- public static Future<Void> delete(final ZooKeeperClient zkc, final String zkPath) {
- final Promise<Void> promise = new Promise<Void>();
-
- try {
- zkc.get().delete(zkPath, -1, new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx) {
- if (KeeperException.Code.OK.intValue() == rc ||
- KeeperException.Code.NONODE.intValue() == rc) {
- promise.setValue(null);
- } else {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
- }
- }
- }, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
- } catch (InterruptedException e) {
- promise.setException(e);
- }
- return promise;
- }
-
- static byte[] serialize(AccessControlEntry ace) throws IOException {
- TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- ace.write(protocol);
- transport.flush();
- return transport.toString(UTF_8.name()).getBytes(UTF_8);
- } catch (TException e) {
- throw new IOException("Failed to serialize access control entry : ", e);
- } catch (UnsupportedEncodingException uee) {
- throw new IOException("Failed to serialize acesss control entry : ", uee);
- }
- }
-
- static AccessControlEntry deserialize(String zkPath, byte[] data) throws IOException {
- if (data.length == 0) {
- return DEFAULT_ACCESS_CONTROL_ENTRY;
- }
-
- AccessControlEntry ace = new AccessControlEntry();
- TMemoryInputTransport transport = new TMemoryInputTransport(data);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- ace.read(protocol);
- } catch (TException e) {
- throw new CorruptedAccessControlException(zkPath, e);
- }
- return ace;
- }
-
-}