You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:39 UTC
[25/31] incubator-distributedlog git commit: DL-163: clean up direct
zookeeper and bookkeeper usage and use metadata/data store abstraction
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
index 850f9c8..bcf8129 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
@@ -19,8 +19,14 @@ package com.twitter.distributedlog.logsegment;
import com.google.common.annotations.Beta;
import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+import com.twitter.distributedlog.metadata.LogMetadataForWriter;
+import com.twitter.distributedlog.util.Allocator;
+import com.twitter.distributedlog.util.Transaction;
import com.twitter.util.Future;
+import java.io.IOException;
+
/**
* Log Segment Store to read log segments
*/
@@ -36,12 +42,14 @@ public interface LogSegmentEntryStore {
Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment);
/**
- * Open the writer for writing data to the log <i>segment</i>.
+ * Create a new log segment allocator for allocating log segment entry writers.
*
- * @param segment the log <i>segment</i> to write data to
- * @return future represent the opened writer
+ * @param metadata the metadata for the log stream
+ * @return future represent the log segment allocator
*/
- Future<LogSegmentEntryWriter> openWriter(LogSegmentMetadata segment);
+ Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator(
+ LogMetadataForWriter metadata,
+ DynamicDistributedLogConfiguration dynConf) throws IOException;
/**
* Open the reader for reading data to the log <i>segment</i>.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
deleted file mode 100644
index ac36ef2..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.thrift.BKDLConfigFormat;
-import com.twitter.distributedlog.util.DLUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.apache.thrift.transport.TMemoryBuffer;
-import org.apache.thrift.transport.TMemoryInputTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * Configurations for BookKeeper based DL.
- */
-public class BKDLConfig implements DLConfig {
-
- private static final Logger LOG = LoggerFactory.getLogger(BKDLConfig.class);
-
- private static final int BUFFER_SIZE = 4096;
- private static final ConcurrentMap<URI, DLConfig> cachedDLConfigs =
- new ConcurrentHashMap<URI, DLConfig>();
-
- public static void propagateConfiguration(BKDLConfig bkdlConfig, DistributedLogConfiguration dlConf) {
- dlConf.setEncodeRegionIDInLogSegmentMetadata(bkdlConfig.getEncodeRegionID());
- dlConf.setFirstLogSegmentSequenceNumber(bkdlConfig.getFirstLogSegmentSeqNo());
- if (bkdlConfig.isFederatedNamespace()) {
- dlConf.setCreateStreamIfNotExists(false);
- LOG.info("Disabled createIfNotExists for federated namespace.");
- }
- LOG.info("Propagate BKDLConfig to DLConfig : encodeRegionID = {}," +
- " firstLogSegmentSequenceNumber = {}, createStreamIfNotExists = {}, isFederated = {}.",
- new Object[] { dlConf.getEncodeRegionIDInLogSegmentMetadata(),
- dlConf.getFirstLogSegmentSequenceNumber(), dlConf.getCreateStreamIfNotExists(),
- bkdlConfig.isFederatedNamespace() });
- }
-
- public static BKDLConfig resolveDLConfig(ZooKeeperClient zkc, URI uri) throws IOException {
- DLConfig dlConfig = cachedDLConfigs.get(uri);
- if (dlConfig == null) {
- dlConfig = (new ZkMetadataResolver(zkc).resolve(uri)).getDLConfig();
- DLConfig oldDLConfig = cachedDLConfigs.putIfAbsent(uri, dlConfig);
- if (null != oldDLConfig) {
- dlConfig = oldDLConfig;
- }
- }
- assert (dlConfig instanceof BKDLConfig);
- return (BKDLConfig)dlConfig;
- }
-
- @VisibleForTesting
- public static void clearCachedDLConfigs() {
- cachedDLConfigs.clear();
- }
-
- private String bkZkServersForWriter;
- private String bkZkServersForReader;
- private String bkLedgersPath;
- private boolean sanityCheckTxnID = true;
- private boolean encodeRegionID = false;
- private String dlZkServersForWriter;
- private String dlZkServersForReader;
- private String aclRootPath;
- private Long firstLogSegmentSeqNo;
- private boolean isFederatedNamespace = false;
-
- /**
- * Construct a empty config with given <i>uri</i>.
- */
- BKDLConfig(URI uri) {
- this(DLUtils.getZKServersFromDLUri(uri),
- DLUtils.getZKServersFromDLUri(uri),
- null, null, null);
- }
-
- /**
- * The caller should make sure both dl and bk use same zookeeper server.
- *
- * @param zkServers
- * zk servers used for both dl and bk.
- * @param ledgersPath
- * ledgers path.
- */
- @VisibleForTesting
- public BKDLConfig(String zkServers, String ledgersPath) {
- this(zkServers, zkServers, zkServers, zkServers, ledgersPath);
- }
-
- public BKDLConfig(String dlZkServersForWriter,
- String dlZkServersForReader,
- String bkZkServersForWriter,
- String bkZkServersForReader,
- String bkLedgersPath) {
- this.dlZkServersForWriter = dlZkServersForWriter;
- this.dlZkServersForReader = dlZkServersForReader;
- this.bkZkServersForWriter = bkZkServersForWriter;
- this.bkZkServersForReader = bkZkServersForReader;
- this.bkLedgersPath = bkLedgersPath;
- }
-
- /**
- * @return zk servers used for bk for writers
- */
- public String getBkZkServersForWriter() {
- return bkZkServersForWriter;
- }
-
- /**
- * @return zk servers used for bk for readers
- */
- public String getBkZkServersForReader() {
- return bkZkServersForReader;
- }
-
- /**
- * @return zk servers used for dl for writers
- */
- public String getDlZkServersForWriter() {
- return dlZkServersForWriter;
- }
-
- /**
- * @return zk servers used for dl for readers
- */
- public String getDlZkServersForReader() {
- return dlZkServersForReader;
- }
-
- /**
- * @return ledgers path for bk
- */
- public String getBkLedgersPath() {
- return bkLedgersPath;
- }
-
- /**
- * Enable/Disable sanity check txn id.
- *
- * @param enabled
- * flag to enable/disable sanity check txn id.
- * @return bk dl config.
- */
- public BKDLConfig setSanityCheckTxnID(boolean enabled) {
- this.sanityCheckTxnID = enabled;
- return this;
- }
-
- /**
- * @return flag to sanity check highest txn id.
- */
- public boolean getSanityCheckTxnID() {
- return sanityCheckTxnID;
- }
-
- /**
- * Enable/Disable encode region id.
- *
- * @param enabled
- * flag to enable/disable encoding region id.
- * @return bk dl config
- */
- public BKDLConfig setEncodeRegionID(boolean enabled) {
- this.encodeRegionID = enabled;
- return this;
- }
-
- /**
- * @return flag to encode region id.
- */
- public boolean getEncodeRegionID() {
- return encodeRegionID;
- }
-
- /**
- * Set the root path of zk based ACL manager.
- *
- * @param aclRootPath
- * root path of zk based ACL manager.
- * @return bk dl config
- */
- public BKDLConfig setACLRootPath(String aclRootPath) {
- this.aclRootPath = aclRootPath;
- return this;
- }
-
- /**
- * Get the root path of zk based ACL manager.
- *
- * @return root path of zk based ACL manager.
- */
- public String getACLRootPath() {
- return aclRootPath;
- }
-
- /**
- * Set the value at which ledger sequence number should start for streams that are being
- * upgraded and did not have ledger sequence number to start with or for newly created
- * streams
- *
- * @param firstLogSegmentSeqNo first ledger sequence number
- * @return bk dl config
- */
- public BKDLConfig setFirstLogSegmentSeqNo(long firstLogSegmentSeqNo) {
- this.firstLogSegmentSeqNo = firstLogSegmentSeqNo;
- return this;
- }
-
- /**
- * Get the value at which ledger sequence number should start for streams that are being
- * upgraded and did not have ledger sequence number to start with or for newly created
- * streams
- *
- * @return first ledger sequence number
- */
- public Long getFirstLogSegmentSeqNo() {
- if (null == firstLogSegmentSeqNo) {
- return DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO;
- }
- return firstLogSegmentSeqNo;
- }
-
- /**
- * Set the namespace to federated <i>isFederatedNamespace</i>.
- *
- * @param isFederatedNamespace
- * is the namespace federated?
- * @return bk dl config
- */
- public BKDLConfig setFederatedNamespace(boolean isFederatedNamespace) {
- this.isFederatedNamespace = isFederatedNamespace;
- return this;
- }
-
- /**
- * Whether the namespace is federated namespace
- *
- * @return true if the namespace is a federated namespace. otherwise false.
- */
- public boolean isFederatedNamespace() {
- return this.isFederatedNamespace;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(bkZkServersForWriter, bkZkServersForReader,
- dlZkServersForWriter, dlZkServersForReader,
- bkLedgersPath);
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof BKDLConfig)) {
- return false;
- }
- BKDLConfig another = (BKDLConfig) o;
- return Objects.equal(bkZkServersForWriter, another.bkZkServersForWriter) &&
- Objects.equal(bkZkServersForReader, another.bkZkServersForReader) &&
- Objects.equal(dlZkServersForWriter, another.dlZkServersForWriter) &&
- Objects.equal(dlZkServersForReader, another.dlZkServersForReader) &&
- Objects.equal(bkLedgersPath, another.bkLedgersPath) &&
- sanityCheckTxnID == another.sanityCheckTxnID &&
- encodeRegionID == another.encodeRegionID &&
- Objects.equal(aclRootPath, another.aclRootPath) &&
- Objects.equal(firstLogSegmentSeqNo, another.firstLogSegmentSeqNo) &&
- Objects.equal(isFederatedNamespace, another.isFederatedNamespace);
-
- }
-
- @Override
- public String toString() {
- return serialize();
- }
-
- @Override
- public String serialize() {
- BKDLConfigFormat configFormat = new BKDLConfigFormat();
- if (null != bkZkServersForWriter) {
- configFormat.setBkZkServers(bkZkServersForWriter);
- }
- if (null != bkZkServersForReader) {
- configFormat.setBkZkServersForReader(bkZkServersForReader);
- }
- if (null != dlZkServersForWriter) {
- configFormat.setDlZkServersForWriter(dlZkServersForWriter);
- }
- if (null != dlZkServersForReader) {
- configFormat.setDlZkServersForReader(dlZkServersForReader);
- }
- if (null != bkLedgersPath) {
- configFormat.setBkLedgersPath(bkLedgersPath);
- }
- configFormat.setSanityCheckTxnID(sanityCheckTxnID);
- configFormat.setEncodeRegionID(encodeRegionID);
- if (null != aclRootPath) {
- configFormat.setAclRootPath(aclRootPath);
- }
- if (null != firstLogSegmentSeqNo) {
- configFormat.setFirstLogSegmentSeqNo(firstLogSegmentSeqNo);
- }
- if (isFederatedNamespace) {
- configFormat.setFederatedNamespace(true);
- }
- return serialize(configFormat);
- }
-
- String serialize(BKDLConfigFormat configFormat) {
- TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- configFormat.write(protocol);
- transport.flush();
- return transport.toString("UTF-8");
- } catch (TException e) {
- throw new RuntimeException("Failed to serialize BKDLConfig : ", e);
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException("Failed to serialize BKDLConfig : ", e);
- }
- }
-
- @Override
- public void deserialize(byte[] data) throws IOException {
- BKDLConfigFormat configFormat = new BKDLConfigFormat();
- TMemoryInputTransport transport = new TMemoryInputTransport(data);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- configFormat.read(protocol);
- } catch (TException e) {
- throw new IOException("Failed to deserialize data '" +
- new String(data, UTF_8) + "' : ", e);
- }
- // bookkeeper cluster settings
- if (configFormat.isSetBkZkServers()) {
- bkZkServersForWriter = configFormat.getBkZkServers();
- }
- if (configFormat.isSetBkZkServersForReader()) {
- bkZkServersForReader = configFormat.getBkZkServersForReader();
- } else {
- bkZkServersForReader = bkZkServersForWriter;
- }
- if (configFormat.isSetBkLedgersPath()) {
- bkLedgersPath = configFormat.getBkLedgersPath();
- }
- // dl zookeeper cluster settings
- if (configFormat.isSetDlZkServersForWriter()) {
- dlZkServersForWriter = configFormat.getDlZkServersForWriter();
- }
- if (configFormat.isSetDlZkServersForReader()) {
- dlZkServersForReader = configFormat.getDlZkServersForReader();
- } else {
- dlZkServersForReader = dlZkServersForWriter;
- }
- // dl settings
- sanityCheckTxnID = !configFormat.isSetSanityCheckTxnID() || configFormat.isSanityCheckTxnID();
- encodeRegionID = configFormat.isSetEncodeRegionID() && configFormat.isEncodeRegionID();
- if (configFormat.isSetAclRootPath()) {
- aclRootPath = configFormat.getAclRootPath();
- }
-
- if (configFormat.isSetFirstLogSegmentSeqNo()) {
- firstLogSegmentSeqNo = configFormat.getFirstLogSegmentSeqNo();
- }
- isFederatedNamespace = configFormat.isSetFederatedNamespace() && configFormat.isFederatedNamespace();
-
- // Validate the settings
- if (null == bkZkServersForWriter || null == bkZkServersForReader || null == bkLedgersPath ||
- null == dlZkServersForWriter || null == dlZkServersForReader) {
- throw new IOException("Missing zk/bk settings in BKDL Config : " + new String(data, UTF_8));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java
index e0331c6..c0b5fb7 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java
@@ -18,6 +18,7 @@
package com.twitter.distributedlog.metadata;
import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
import com.twitter.distributedlog.util.Utils;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.ZooKeeperClientBuilder;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java
deleted file mode 100644
index 303fbe6..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import com.twitter.distributedlog.ZooKeeperClient;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.data.Stat;
-
-import java.io.IOException;
-import java.net.URI;
-
-public class ZkMetadataResolver implements MetadataResolver {
-
- private final ZooKeeperClient zkc;
-
- public ZkMetadataResolver(ZooKeeperClient zkc) {
- this.zkc = zkc;
- }
-
- @Override
- public DLMetadata resolve(URI uri) throws IOException {
- String dlPath = uri.getPath();
- PathUtils.validatePath(dlPath);
- // Normal case the dl metadata is stored in the last segment
- // so lookup last segment first.
- String[] parts = StringUtils.split(dlPath, '/');
- if (null == parts || 0 == parts.length) {
- throw new IOException("Invalid dlPath to resolve dl metadata : " + dlPath);
- }
- for (int i = parts.length; i >= 0; i--) {
- String pathToResolve = String.format("/%s", StringUtils.join(parts, '/', 0, i));
- byte[] data;
- try {
- data = zkc.get().getData(pathToResolve, false, new Stat());
- } catch (KeeperException.NoNodeException nne) {
- continue;
- } catch (KeeperException ke) {
- throw new IOException("Fail to resolve dl path : " + pathToResolve);
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted when resolving dl path : " + pathToResolve);
- }
- if (null == data || data.length == 0) {
- continue;
- }
- try {
- return DLMetadata.deserialize(uri, data);
- } catch (IOException ie) {
- throw new IOException("Failed to deserialize uri : " + uri);
- }
- }
- throw new IOException("No bkdl config bound under dl path : " + dlPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
index b5abe9f..5d1d888 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
@@ -70,6 +70,13 @@ import org.apache.bookkeeper.stats.StatsLogger;
@Beta
public interface DistributedLogNamespace {
+ /**
+ * Get the namespace driver used by this namespace.
+ *
+ * @return namespace driver
+ */
+ NamespaceDriver getNamespaceDriver();
+
//
// Method to operate logs
//
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java
index a01bb70..07b3848 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java
@@ -17,22 +17,29 @@
*/
package com.twitter.distributedlog.namespace;
-import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.twitter.distributedlog.BKDistributedLogNamespace;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+import com.twitter.distributedlog.feature.CoreFeatureKeys;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
+import com.twitter.distributedlog.injector.AsyncRandomFailureInjector;
+import com.twitter.distributedlog.util.ConfUtils;
+import com.twitter.distributedlog.util.DLUtils;
+import com.twitter.distributedlog.util.OrderedScheduler;
+import com.twitter.distributedlog.util.PermitLimiter;
+import com.twitter.distributedlog.util.SimplePermitLimiter;
+import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
/**
* Builder to construct a <code>DistributedLogNamespace</code>.
@@ -50,6 +57,7 @@ public class DistributedLogNamespaceBuilder {
}
private DistributedLogConfiguration _conf = null;
+ private DynamicDistributedLogConfiguration _dynConf = null;
private URI _uri = null;
private StatsLogger _statsLogger = NullStatsLogger.INSTANCE;
private StatsLogger _perLogStatsLogger = NullStatsLogger.INSTANCE;
@@ -73,6 +81,17 @@ public class DistributedLogNamespaceBuilder {
}
/**
+ * Dynamic DistributedLog Configuration used for the namespace
+ *
+ * @param dynConf dynamic distributedlog configuration
+ * @return namespace builder
+ */
+ public DistributedLogNamespaceBuilder dynConf(DynamicDistributedLogConfiguration dynConf) {
+ this._dynConf = dynConf;
+ return this;
+ }
+
+ /**
* Namespace Location.
*
* @param uri
@@ -146,6 +165,18 @@ public class DistributedLogNamespaceBuilder {
return this;
}
+ @SuppressWarnings("deprecation")
+ private static StatsLogger normalizePerLogStatsLogger(StatsLogger statsLogger,
+ StatsLogger perLogStatsLogger,
+ DistributedLogConfiguration conf) {
+ StatsLogger normalizedPerLogStatsLogger = perLogStatsLogger;
+ if (perLogStatsLogger == NullStatsLogger.INSTANCE &&
+ conf.getEnablePerStreamStat()) {
+ normalizedPerLogStatsLogger = statsLogger.scope("stream");
+ }
+ return normalizedPerLogStatsLogger;
+ }
+
/**
* Build the namespace.
*
@@ -160,25 +191,17 @@ public class DistributedLogNamespaceBuilder {
Preconditions.checkNotNull(_conf, "No DistributedLog Configuration.");
Preconditions.checkNotNull(_uri, "No DistributedLog URI");
- // Validate the uri and load the backend according to scheme
- String scheme = _uri.getScheme();
- Preconditions.checkNotNull(scheme, "Invalid DistributedLog URI : " + _uri);
- String[] schemeParts = StringUtils.split(scheme, '-');
- Preconditions.checkArgument(schemeParts.length > 0,
- "Invalid distributedlog scheme found : " + _uri);
- Preconditions.checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()),
- "Unknown distributedlog scheme found : " + _uri);
-
- // both distributedlog: & distributedlog-bk: use bookkeeper as the backend
- // TODO: we could do reflection to load backend in future.
- // if we are going to support other backends : e.g. 'distributedlog-mem:'.
- if (schemeParts.length > 1) {
- String backendProvider = schemeParts[1];
- Preconditions.checkArgument(Objects.equal(DistributedLogConstants.BACKEND_BK, backendProvider.toLowerCase()),
- "Backend '" + backendProvider + "' is not supported yet.");
+ // validate the configuration
+ _conf.validate();
+ if (null == _dynConf) {
+ _dynConf = ConfUtils.getConstDynConf(_conf);
}
- // Built the feature provider
+ // retrieve the namespace driver
+ NamespaceDriver driver = NamespaceDriverManager.getDriver(_uri);
+ URI normalizedUri = DLUtils.normalizeURI(_uri);
+
+ // build the feature provider
FeatureProvider featureProvider;
if (null == _featureProvider) {
featureProvider = new SettableFeatureProvider("", 0);
@@ -187,25 +210,69 @@ public class DistributedLogNamespaceBuilder {
featureProvider = _featureProvider;
}
- URI bkUri;
- try {
- bkUri = new URI(
- schemeParts[0], // remove backend info from bookkeeper backend
- _uri.getAuthority(),
- _uri.getPath(),
- _uri.getQuery(),
- _uri.getFragment());
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException("Invalid distributedlog uri found : " + _uri, e);
- }
+ // build the failure injector
+ AsyncFailureInjector failureInjector = AsyncRandomFailureInjector.newBuilder()
+ .injectDelays(_conf.getEIInjectReadAheadDelay(),
+ _conf.getEIInjectReadAheadDelayPercent(),
+ _conf.getEIInjectMaxReadAheadDelayMs())
+ .injectErrors(false, 10)
+ .injectStops(_conf.getEIInjectReadAheadStall(), 10)
+ .injectCorruption(_conf.getEIInjectReadAheadBrokenEntries())
+ .build();
- return BKDistributedLogNamespace.newBuilder()
- .conf(_conf)
- .uri(bkUri)
- .statsLogger(_statsLogger)
- .featureProvider(featureProvider)
- .clientId(_clientId)
- .regionId(_regionId)
+ // normalize the per log stats logger
+ StatsLogger perLogStatsLogger = normalizePerLogStatsLogger(_statsLogger, _perLogStatsLogger, _conf);
+
+ // build the scheduler
+ StatsLogger schedulerStatsLogger = _statsLogger.scope("factory").scope("thread_pool");
+ OrderedScheduler scheduler = OrderedScheduler.newBuilder()
+ .name("DLM-" + normalizedUri.getPath())
+ .corePoolSize(_conf.getNumWorkerThreads())
+ .statsLogger(schedulerStatsLogger)
+ .perExecutorStatsLogger(schedulerStatsLogger)
+ .traceTaskExecution(_conf.getEnableTaskExecutionStats())
+ .traceTaskExecutionWarnTimeUs(_conf.getTaskExecutionWarnTimeMicros())
.build();
+
+ // initialize the namespace driver
+ driver.initialize(
+ _conf,
+ _dynConf,
+ normalizedUri,
+ scheduler,
+ featureProvider,
+ failureInjector,
+ _statsLogger,
+ perLogStatsLogger,
+ DLUtils.normalizeClientId(_clientId),
+ _regionId);
+
+ // initialize the write limiter
+ PermitLimiter writeLimiter;
+ if (_conf.getGlobalOutstandingWriteLimit() < 0) {
+ writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
+ } else {
+ Feature disableWriteLimitFeature = featureProvider.getFeature(
+ CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase());
+ writeLimiter = new SimplePermitLimiter(
+ _conf.getOutstandingWriteLimitDarkmode(),
+ _conf.getGlobalOutstandingWriteLimit(),
+ _statsLogger.scope("writeLimiter"),
+ true /* singleton */,
+ disableWriteLimitFeature);
+ }
+
+ return new BKDistributedLogNamespace(
+ _conf,
+ normalizedUri,
+ driver,
+ scheduler,
+ featureProvider,
+ writeLimiter,
+ failureInjector,
+ _statsLogger,
+ perLogStatsLogger,
+ DLUtils.normalizeClientId(_clientId),
+ _regionId);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java
new file mode 100644
index 0000000..738f124
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.namespace;
+
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.MetadataAccessor;
+import com.twitter.distributedlog.acl.AccessControlManager;
+import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.metadata.LogMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
+import com.twitter.distributedlog.subscription.SubscriptionsStore;
+import com.twitter.distributedlog.util.OrderedScheduler;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Manager to manage all the stores required by a namespace.
+ */
+public interface NamespaceDriver extends Closeable {
+
+ enum Role {
+ WRITER,
+ READER
+ }
+
+ /**
+ * Initialize the namespace manager.
+ *
+ * @param conf distributedlog configuration
+ * @param dynConf dynamic distributedlog configuration
+ * @param namespace root uri of the namespace
+ * @param scheduler ordered scheduler
+ * @param featureProvider feature provider
+ * @param statsLogger stats logger
+ * @param perLogStatsLogger per log stream stats logger
+ * @param clientId client id
+ * @return namespace manager
+ * @throws IOException when failed to initialize the namespace manager
+ */
+ NamespaceDriver initialize(DistributedLogConfiguration conf,
+ DynamicDistributedLogConfiguration dynConf,
+ URI namespace,
+ OrderedScheduler scheduler,
+ FeatureProvider featureProvider,
+ AsyncFailureInjector failureInjector,
+ StatsLogger statsLogger,
+ StatsLogger perLogStatsLogger,
+ String clientId,
+ int regionId) throws IOException;
+
+ /**
+ * Get the scheme of the namespace driver.
+ *
+ * @return the scheme of the namespace driver.
+ */
+ String getScheme();
+
+ /**
+ * Get the root uri of the namespace driver.
+ *
+ * @return the root uri of the namespace driver.
+ */
+ URI getUri();
+
+ /**
+ * Retrieve the log {@code metadata store} used by the namespace.
+ *
+ * @return the log metadata store
+ */
+ LogMetadataStore getLogMetadataStore();
+
+ /**
+ * Retrieve the log stream {@code metadata store} used by the namespace.
+ *
+ * @param role the role to retrieve the log stream metadata store.
+ * @return the log stream metadata store
+ */
+ LogStreamMetadataStore getLogStreamMetadataStore(Role role);
+
+ /**
+ * Retrieve the log segment {@code entry store} used by the namespace.
+ *
+ * @param role the role to retrieve the log segment entry store.
+ * @return the log segment entry store.
+ * @throws IOException when failed to open log segment entry store.
+ */
+ LogSegmentEntryStore getLogSegmentEntryStore(Role role);
+
+ /**
+ * Create an access control manager to manage/check acl for logs.
+ *
+ * @return access control manager for logs under the namespace.
+ * @throws IOException
+ */
+ AccessControlManager getAccessControlManager()
+ throws IOException;
+
+ /**
+ * Retrieve the metadata accessor for log stream {@code streamName}.
+ * (TODO: it is a legacy interface. should remove it if we have metadata of stream.)
+ *
+ * @param streamName name of log stream.
+ * @return metadata accessor for log stream {@code streamName}.
+ */
+ MetadataAccessor getMetadataAccessor(String streamName)
+ throws InvalidStreamNameException, IOException;
+
+ /**
+ * Retrieve the subscriptions store for log stream {@code streamName}.
+ *
+ * @return the subscriptions store for log stream {@code streamName}
+ */
+ SubscriptionsStore getSubscriptionsStore(String streamName);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java
new file mode 100644
index 0000000..79945ad
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.namespace;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static com.google.common.base.Preconditions.*;
+
+/**
+ * The basic service for managing a set of namespace drivers.
+ */
+public class NamespaceDriverManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(NamespaceDriverManager.class);
+
+ static class NamespaceDriverInfo {
+
+ final Class<? extends NamespaceDriver> driverClass;
+ final String driverClassName;
+
+ NamespaceDriverInfo(Class<? extends NamespaceDriver> driverClass) {
+ this.driverClass = driverClass;
+ this.driverClassName = this.driverClass.getName();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("driver[")
+ .append(driverClassName)
+ .append("]");
+ return sb.toString();
+ }
+ }
+
+ private static final ConcurrentMap<String, NamespaceDriverInfo> drivers;
+ private static boolean initialized = false;
+
+ static {
+ drivers = new ConcurrentHashMap<String, NamespaceDriverInfo>();
+ initialize();
+ }
+
+ static void initialize() {
+ if (initialized) {
+ return;
+ }
+ loadInitialDrivers();
+ initialized = true;
+ logger.info("DistributedLog NamespaceDriverManager initialized");
+ }
+
+ private static void loadInitialDrivers() {
+ Set<String> driverList = Sets.newHashSet();
+ // add default bookkeeper based driver
+ driverList.add(BKNamespaceDriver.class.getName());
+ // load drivers from system property
+ String driversStr = System.getProperty("distributedlog.namespace.drivers");
+ if (null != driversStr) {
+ String[] driversArray = StringUtils.split(driversStr, ':');
+ for (String driver : driversArray) {
+ driverList.add(driver);
+ }
+ }
+ // initialize the drivers
+ for (String driverClsName : driverList) {
+ try {
+ NamespaceDriver driver =
+ ReflectionUtils.newInstance(driverClsName, NamespaceDriver.class);
+ NamespaceDriverInfo driverInfo = new NamespaceDriverInfo(driver.getClass());
+ drivers.put(driver.getScheme().toLowerCase(), driverInfo);
+ } catch (Exception ex) {
+ logger.warn("Failed to load namespace driver {} : ", driverClsName, ex);
+ }
+ }
+ }
+
+ /**
+ * Prevent the NamespaceDriverManager class from being instantiated.
+ */
+ private NamespaceDriverManager() {}
+
+ /**
+ * Register the namespace {@code driver}.
+ *
+ * @param driver the namespace driver
+ * @return the namespace driver manager
+ */
+ public static void registerDriver(String backend, Class<? extends NamespaceDriver> driver) {
+ if (!initialized) {
+ initialize();
+ }
+
+ String scheme = backend.toLowerCase();
+ NamespaceDriverInfo oldDriverInfo = drivers.get(scheme);
+ if (null != oldDriverInfo) {
+ return;
+ }
+ NamespaceDriverInfo newDriverInfo = new NamespaceDriverInfo(driver);
+ oldDriverInfo = drivers.putIfAbsent(scheme, newDriverInfo);
+ if (null != oldDriverInfo) {
+ logger.debug("Driver for {} is already there.", scheme);
+ }
+ }
+
+ /**
+ * Retrieve the namespace driver for {@code scheme}.
+ *
+ * @param scheme the scheme for the namespace driver
+ * @return the namespace driver
+ * @throws NullPointerException when scheme is null
+ */
+ public static NamespaceDriver getDriver(String scheme) {
+ checkNotNull(scheme, "Driver Scheme is null");
+ if (!initialized) {
+ initialize();
+ }
+ NamespaceDriverInfo driverInfo = drivers.get(scheme.toLowerCase());
+ if (null == driverInfo) {
+ throw new IllegalArgumentException("Unknown backend " + scheme);
+ }
+ return ReflectionUtils.newInstance(driverInfo.driverClass);
+ }
+
+ /**
+ * Retrieve the namespace driver for {@code uri}.
+ *
+ * @param uri the distributedlog uri
+ * @return the namespace driver for {@code uri}
+ * @throws NullPointerException if the distributedlog {@code uri} is null or doesn't have scheme
+ * or there is no namespace driver registered for the scheme
+ * @throws IllegalArgumentException if the distributedlog {@code uri} scheme is illegal
+ */
+ public static NamespaceDriver getDriver(URI uri) {
+ // Validate the uri and load the backend according to scheme
+ checkNotNull(uri, "DistributedLog uri is null");
+ String scheme = uri.getScheme();
+ checkNotNull(scheme, "Invalid distributedlog uri : " + uri);
+ scheme = scheme.toLowerCase();
+ String[] schemeParts = StringUtils.split(scheme, '-');
+ checkArgument(schemeParts.length > 0,
+ "Invalid distributedlog scheme found : " + uri);
+ checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()),
+ "Unknown distributedlog scheme found : " + uri);
+ // bookkeeper is the default backend
+ String backend = DistributedLogConstants.BACKEND_BK;
+ if (schemeParts.length > 1) {
+ backend = schemeParts[1];
+ }
+ return getDriver(backend);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionStateStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionStateStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionStateStore.java
deleted file mode 100644
index 9cd2da5..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionStateStore.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.subscription;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
-
-import com.google.common.base.Charsets;
-
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-
-public class ZKSubscriptionStateStore implements SubscriptionStateStore {
-
- static final Logger logger = LoggerFactory.getLogger(ZKSubscriptionStateStore.class);
-
- private final ZooKeeperClient zooKeeperClient;
- private final String zkPath;
- private AtomicReference<DLSN> lastCommittedPosition = new AtomicReference<DLSN>(null);
-
- public ZKSubscriptionStateStore(ZooKeeperClient zooKeeperClient, String zkPath) {
- this.zooKeeperClient = zooKeeperClient;
- this.zkPath = zkPath;
- }
-
- @Override
- public void close() throws IOException {
- }
-
- /**
- * Get the last committed position stored for this subscription
- */
- @Override
- public Future<DLSN> getLastCommitPosition() {
- if (null != lastCommittedPosition.get()) {
- return Future.value(lastCommittedPosition.get());
- } else {
- return getLastCommitPositionFromZK();
- }
- }
-
- Future<DLSN> getLastCommitPositionFromZK() {
- final Promise<DLSN> result = new Promise<DLSN>();
- try {
- logger.debug("Reading last commit position from path {}", zkPath);
- zooKeeperClient.get().getData(zkPath, false, new AsyncCallback.DataCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
- logger.debug("Read last commit position from path {}: rc = {}", zkPath, rc);
- if (KeeperException.Code.NONODE.intValue() == rc) {
- result.setValue(DLSN.NonInclusiveLowerBound);
- } else if (KeeperException.Code.OK.intValue() != rc) {
- result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
- } else {
- try {
- DLSN dlsn = DLSN.deserialize(new String(data, Charsets.UTF_8));
- result.setValue(dlsn);
- } catch (Exception t) {
- logger.warn("Invalid last commit position found from path {}", zkPath, t);
- // invalid dlsn recorded in subscription state store
- result.setValue(DLSN.NonInclusiveLowerBound);
- }
- }
- }
- }, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
- result.setException(zkce);
- } catch (InterruptedException ie) {
- result.setException(new DLInterruptedException("getLastCommitPosition was interrupted", ie));
- }
- return result;
- }
-
- /**
- * Advances the position associated with the subscriber
- *
- * @param newPosition - new commit position
- */
- @Override
- public Future<BoxedUnit> advanceCommitPosition(DLSN newPosition) {
- if (null == lastCommittedPosition.get() ||
- (newPosition.compareTo(lastCommittedPosition.get()) > 0)) {
- lastCommittedPosition.set(newPosition);
- return Utils.zkAsyncCreateFullPathOptimisticAndSetData(zooKeeperClient,
- zkPath, newPosition.serialize().getBytes(Charsets.UTF_8),
- zooKeeperClient.getDefaultACL(),
- CreateMode.PERSISTENT);
- } else {
- return Future.Done();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java
deleted file mode 100644
index f1e6251..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.subscription;
-
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * ZooKeeper Based Subscriptions Store.
- */
-public class ZKSubscriptionsStore implements SubscriptionsStore {
-
- private final ZooKeeperClient zkc;
- private final String zkPath;
- private final ConcurrentMap<String, ZKSubscriptionStateStore> subscribers =
- new ConcurrentHashMap<String, ZKSubscriptionStateStore>();
-
- public ZKSubscriptionsStore(ZooKeeperClient zkc, String zkPath) {
- this.zkc = zkc;
- this.zkPath = zkPath;
- }
-
- private ZKSubscriptionStateStore getSubscriber(String subscriberId) {
- ZKSubscriptionStateStore ss = subscribers.get(subscriberId);
- if (ss == null) {
- ZKSubscriptionStateStore newSS = new ZKSubscriptionStateStore(zkc,
- getSubscriberZKPath(subscriberId));
- ZKSubscriptionStateStore oldSS = subscribers.putIfAbsent(subscriberId, newSS);
- if (oldSS == null) {
- ss = newSS;
- } else {
- try {
- newSS.close();
- } catch (IOException e) {
- // ignore the exception
- }
- ss = oldSS;
- }
- }
- return ss;
- }
-
- private String getSubscriberZKPath(String subscriberId) {
- return String.format("%s/%s", zkPath, subscriberId);
- }
-
- @Override
- public Future<DLSN> getLastCommitPosition(String subscriberId) {
- return getSubscriber(subscriberId).getLastCommitPosition();
- }
-
- @Override
- public Future<Map<String, DLSN>> getLastCommitPositions() {
- final Promise<Map<String, DLSN>> result = new Promise<Map<String, DLSN>>();
- try {
- this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() {
- @Override
- public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
- if (KeeperException.Code.NONODE.intValue() == rc) {
- result.setValue(new HashMap<String, DLSN>());
- } else if (KeeperException.Code.OK.intValue() != rc) {
- result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
- } else {
- getLastCommitPositions(result, children);
- }
- }
- }, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
- result.setException(zkce);
- } catch (InterruptedException ie) {
- result.setException(new DLInterruptedException("getLastCommitPositions was interrupted", ie));
- }
- return result;
- }
-
- private void getLastCommitPositions(final Promise<Map<String, DLSN>> result,
- List<String> subscribers) {
- List<Future<Pair<String, DLSN>>> futures =
- new ArrayList<Future<Pair<String, DLSN>>>(subscribers.size());
- for (String s : subscribers) {
- final String subscriber = s;
- Future<Pair<String, DLSN>> future =
- // Get the last commit position from zookeeper
- getSubscriber(subscriber).getLastCommitPositionFromZK().map(
- new AbstractFunction1<DLSN, Pair<String, DLSN>>() {
- @Override
- public Pair<String, DLSN> apply(DLSN dlsn) {
- return Pair.of(subscriber, dlsn);
- }
- });
- futures.add(future);
- }
- Future.collect(futures).foreach(
- new AbstractFunction1<List<Pair<String, DLSN>>, BoxedUnit>() {
- @Override
- public BoxedUnit apply(List<Pair<String, DLSN>> subscriptions) {
- Map<String, DLSN> subscriptionMap = new HashMap<String, DLSN>();
- for (Pair<String, DLSN> pair : subscriptions) {
- subscriptionMap.put(pair.getLeft(), pair.getRight());
- }
- result.setValue(subscriptionMap);
- return BoxedUnit.UNIT;
- }
- });
- }
-
- @Override
- public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN newPosition) {
- return getSubscriber(subscriberId).advanceCommitPosition(newPosition);
- }
-
- @Override
- public Future<Boolean> deleteSubscriber(String subscriberId) {
- subscribers.remove(subscriberId);
- String path = getSubscriberZKPath(subscriberId);
- return Utils.zkDeleteIfNotExist(zkc, path, new ZkVersion(-1));
- }
-
- @Override
- public void close() throws IOException {
- // no-op
- for (SubscriptionStateStore store : subscribers.values()) {
- store.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
index 30d6908..4565921 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
@@ -30,7 +30,6 @@ import java.io.PrintWriter;
import java.net.MalformedURLException;
import java.net.URI;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Enumeration;
@@ -53,11 +52,16 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.base.Preconditions;
import com.twitter.distributedlog.BKDistributedLogNamespace;
import com.twitter.distributedlog.Entry;
+import com.twitter.distributedlog.MetadataAccessor;
import com.twitter.distributedlog.callback.NamespaceListener;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.distributedlog.namespace.NamespaceDriver;
import com.twitter.distributedlog.util.Utils;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -100,17 +104,15 @@ import com.twitter.distributedlog.ZooKeeperClientBuilder;
import com.twitter.distributedlog.auditor.DLAuditor;
import com.twitter.distributedlog.bk.LedgerAllocator;
import com.twitter.distributedlog.bk.LedgerAllocatorUtils;
-import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
import com.twitter.distributedlog.metadata.MetadataUpdater;
import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
import com.twitter.distributedlog.util.SchedulerUtils;
import com.twitter.util.Await;
-import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import static com.google.common.base.Charsets.UTF_8;
-@SuppressWarnings("deprecation")
public class DistributedLogTool extends Tool {
static final Logger logger = LoggerFactory.getLogger(DistributedLogTool.class);
@@ -161,7 +163,7 @@ public class DistributedLogTool extends Tool {
protected URI uri;
protected String zkAclId = null;
protected boolean force = false;
- protected com.twitter.distributedlog.DistributedLogManagerFactory factory = null;
+ protected DistributedLogNamespace namespace = null;
protected PerDLCommand(String name, String description) {
super(name, description);
@@ -187,8 +189,8 @@ public class DistributedLogTool extends Tool {
return runCmd();
} finally {
synchronized (this) {
- if (null != factory) {
- factory.close();
+ if (null != namespace) {
+ namespace.close();
}
}
}
@@ -252,35 +254,33 @@ public class DistributedLogTool extends Tool {
this.force = force;
}
- protected synchronized com.twitter.distributedlog.DistributedLogManagerFactory getFactory() throws IOException {
- if (null == this.factory) {
- this.factory = new com.twitter.distributedlog.DistributedLogManagerFactory(getConf(), getUri());
- logger.info("Construct DLM : uri = {}", getUri());
- }
- return this.factory;
- }
-
protected DistributedLogNamespace getNamespace() throws IOException {
- return getFactory().getNamespace();
+ if (null == this.namespace) {
+ this.namespace = DistributedLogNamespaceBuilder.newBuilder()
+ .uri(getUri())
+ .conf(getConf())
+ .build();
+ }
+ return this.namespace;
}
protected LogSegmentMetadataStore getLogSegmentMetadataStore() throws IOException {
- DistributedLogNamespace namespace = getFactory().getNamespace();
- assert(namespace instanceof BKDistributedLogNamespace);
- return ((BKDistributedLogNamespace) namespace).getWriterStreamMetadataStore()
+ return getNamespace()
+ .getNamespaceDriver()
+ .getLogStreamMetadataStore(NamespaceDriver.Role.READER)
.getLogSegmentMetadataStore();
}
protected ZooKeeperClient getZooKeeperClient() throws IOException {
- DistributedLogNamespace namespace = getFactory().getNamespace();
- assert(namespace instanceof BKDistributedLogNamespace);
- return ((BKDistributedLogNamespace) namespace).getSharedWriterZKCForDL();
+ NamespaceDriver driver = getNamespace().getNamespaceDriver();
+ assert(driver instanceof BKNamespaceDriver);
+ return ((BKNamespaceDriver) driver).getWriterZKC();
}
protected BookKeeperClient getBookKeeperClient() throws IOException {
- DistributedLogNamespace namespace = getFactory().getNamespace();
- assert(namespace instanceof BKDistributedLogNamespace);
- return ((BKDistributedLogNamespace) namespace).getReaderBKC();
+ NamespaceDriver driver = getNamespace().getNamespaceDriver();
+ assert(driver instanceof BKNamespaceDriver);
+ return ((BKNamespaceDriver) driver).getReaderBKC();
}
}
@@ -347,6 +347,10 @@ public class DistributedLogTool extends Tool {
}
}
+ /**
+ * NOTE: we might consider adding a command to 'delete' namespace. The implementation of the namespace
+ * driver should implement the 'delete' operation.
+ */
protected static class DeleteAllocatorPoolCommand extends PerDLCommand {
int concurrency = 1;
@@ -380,8 +384,12 @@ public class DistributedLogTool extends Tool {
String rootPath = getUri().getPath() + "/" + allocationPoolPath;
final ScheduledExecutorService allocationExecutor = Executors.newSingleThreadScheduledExecutor();
ExecutorService executorService = Executors.newFixedThreadPool(concurrency);
+ Preconditions.checkArgument(getNamespace() instanceof BKDistributedLogNamespace);
+ BKDistributedLogNamespace bkns = (BKDistributedLogNamespace) getNamespace();
+ final ZooKeeperClient zkc = ((BKNamespaceDriver) bkns.getNamespaceDriver()).getWriterZKC();
+ final BookKeeperClient bkc = ((BKNamespaceDriver) bkns.getNamespaceDriver()).getReaderBKC();
try {
- List<String> pools = getZooKeeperClient().get().getChildren(rootPath, false);
+ List<String> pools = zkc.get().getChildren(rootPath, false);
final LinkedBlockingQueue<String> poolsToDelete = new LinkedBlockingQueue<String>();
if (getForce() || IOUtils.confirmPrompt("Are you sure you want to delete allocator pools : " + pools)) {
for (String pool : pools) {
@@ -401,7 +409,7 @@ public class DistributedLogTool extends Tool {
try {
LedgerAllocator allocator =
LedgerAllocatorUtils.createLedgerAllocatorPool(poolPath, 0, getConf(),
- getZooKeeperClient(), getBookKeeperClient(),
+ zkc, bkc,
allocationExecutor);
allocator.delete();
System.out.println("Deleted allocator pool : " + poolPath + " .");
@@ -454,43 +462,35 @@ public class DistributedLogTool extends Tool {
@Override
protected int runCmd() throws Exception {
- if (printMetadata) {
- printStreamsWithMetadata(getFactory());
- } else {
- printStreams(getFactory());
- }
+ printStreams(getNamespace());
return 0;
}
- protected void printStreamsWithMetadata(com.twitter.distributedlog.DistributedLogManagerFactory factory)
- throws Exception {
- Map<String, byte[]> streams = factory.enumerateLogsWithMetadataInNamespace();
+ protected void printStreams(DistributedLogNamespace namespace) throws Exception {
+ Iterator<String> streams = namespace.getLogs();
System.out.println("Streams under " + getUri() + " : ");
System.out.println("--------------------------------");
- for (Map.Entry<String, byte[]> entry : streams.entrySet()) {
- println(entry.getKey());
- if (null == entry.getValue() || entry.getValue().length == 0) {
+ while (streams.hasNext()) {
+ String streamName = streams.next();
+ System.out.println(streamName);
+ if (!printMetadata) {
+ continue;
+ }
+ MetadataAccessor accessor =
+ namespace.getNamespaceDriver().getMetadataAccessor(streamName);
+ byte[] metadata = accessor.getMetadata();
+ if (null == metadata || metadata.length == 0) {
continue;
}
if (printHex) {
- System.out.println(Hex.encodeHexString(entry.getValue()));
+ System.out.println(Hex.encodeHexString(metadata));
} else {
- System.out.println(new String(entry.getValue(), UTF_8));
+ System.out.println(new String(metadata, UTF_8));
}
System.out.println("");
}
System.out.println("--------------------------------");
}
-
- protected void printStreams(com.twitter.distributedlog.DistributedLogManagerFactory factory) throws Exception {
- Collection<String> streams = factory.enumerateAllLogsInNamespace();
- System.out.println("Streams under " + getUri() + " : ");
- System.out.println("--------------------------------");
- for (String stream : streams) {
- System.out.println(stream);
- }
- System.out.println("--------------------------------");
- }
}
public static class WatchNamespaceCommand extends PerDLCommand implements NamespaceListener {
@@ -609,16 +609,17 @@ public class DistributedLogTool extends Tool {
private void inspectStreams(final SortedMap<String, List<Pair<LogSegmentMetadata, List<String>>>> corruptedCandidates)
throws Exception {
- Collection<String> streamCollection = getFactory().enumerateAllLogsInNamespace();
+ Iterator<String> streamCollection = getNamespace().getLogs();
final List<String> streams = new ArrayList<String>();
- if (null != streamPrefix) {
- for (String s : streamCollection) {
+ while (streamCollection.hasNext()) {
+ String s = streamCollection.next();
+ if (null != streamPrefix) {
if (s.startsWith(streamPrefix)) {
streams.add(s);
}
+ } else {
+ streams.add(s);
}
- } else {
- streams.addAll(streamCollection);
}
if (0 == streams.size()) {
return;
@@ -660,8 +661,7 @@ public class DistributedLogTool extends Tool {
for (int i = startIdx; i < endIdx; i++) {
String s = streams.get(i);
BookKeeperClient bkc = getBookKeeperClient();
- DistributedLogManager dlm =
- getFactory().createDistributedLogManagerWithSharedClients(s);
+ DistributedLogManager dlm = getNamespace().openLog(s);
try {
List<LogSegmentMetadata> segments = dlm.getLogSegments();
if (segments.size() <= 1) {
@@ -782,20 +782,21 @@ public class DistributedLogTool extends Tool {
@Override
protected int runCmd() throws Exception {
getConf().setZkAclId(getZkAclId());
- return truncateStreams(getFactory());
+ return truncateStreams(getNamespace());
}
- private int truncateStreams(final com.twitter.distributedlog.DistributedLogManagerFactory factory) throws Exception {
- Collection<String> streamCollection = factory.enumerateAllLogsInNamespace();
+ private int truncateStreams(final DistributedLogNamespace namespace) throws Exception {
+ Iterator<String> streamCollection = namespace.getLogs();
final List<String> streams = new ArrayList<String>();
- if (null != streamPrefix) {
- for (String s : streamCollection) {
+ while (streamCollection.hasNext()) {
+ String s = streamCollection.next();
+ if (null != streamPrefix) {
if (s.startsWith(streamPrefix)) {
streams.add(s);
}
+ } else {
+ streams.add(s);
}
- } else {
- streams.addAll(streamCollection);
}
if (0 == streams.size()) {
return 0;
@@ -813,7 +814,7 @@ public class DistributedLogTool extends Tool {
@Override
public void run() {
try {
- truncateStreams(factory, streams, tid, numStreamsPerThreads);
+ truncateStreams(namespace, streams, tid, numStreamsPerThreads);
System.out.println("Thread " + tid + " finished.");
} catch (IOException e) {
System.err.println("Thread " + tid + " quits with exception : " + e.getMessage());
@@ -828,14 +829,13 @@ public class DistributedLogTool extends Tool {
return 0;
}
- private void truncateStreams(com.twitter.distributedlog.DistributedLogManagerFactory factory, List<String> streams,
+ private void truncateStreams(DistributedLogNamespace namespace, List<String> streams,
int tid, int numStreamsPerThreads) throws IOException {
int startIdx = tid * numStreamsPerThreads;
int endIdx = Math.min(streams.size(), (tid + 1) * numStreamsPerThreads);
for (int i = startIdx; i < endIdx; i++) {
String s = streams.get(i);
- DistributedLogManager dlm =
- factory.createDistributedLogManagerWithSharedClients(s);
+ DistributedLogManager dlm = namespace.openLog(s);
try {
if (deleteStream) {
dlm.delete();
@@ -930,7 +930,7 @@ public class DistributedLogTool extends Tool {
@Override
protected int runCmd() throws Exception {
- DistributedLogManager dlm = getFactory().createDistributedLogManagerWithSharedClients(getStreamName());
+ DistributedLogManager dlm = getNamespace().openLog(getStreamName());
try {
if (listEppStats) {
bkc = new SimpleBookKeeperClient(getConf(), getUri());
@@ -1078,7 +1078,7 @@ public class DistributedLogTool extends Tool {
@Override
protected int runCmd() throws Exception {
- DistributedLogManager dlm = getFactory().createDistributedLogManagerWithSharedClients(getStreamName());
+ DistributedLogManager dlm = getNamespace().openLog(getStreamName());
try {
long count = 0;
if (null == endDLSN) {
@@ -1141,7 +1141,7 @@ public class DistributedLogTool extends Tool {
@Override
protected int runCmd() throws Exception {
getConf().setZkAclId(getZkAclId());
- DistributedLogManager dlm = getFactory().createDistributedLogManagerWithSharedClients(getStreamName());
+ DistributedLogManager dlm = getNamespace().openLog(getStreamName());
try {
dlm.delete();
} finally {
@@ -1347,7 +1347,7 @@ public class DistributedLogTool extends Tool {
}
getConf().setZkAclId(getZkAclId());
for (String stream : streams) {
- getFactory().getNamespace().createLog(stream);
+ getNamespace().createLog(stream);
}
return 0;
}
@@ -1435,7 +1435,7 @@ public class DistributedLogTool extends Tool {
@Override
protected int runCmd() throws Exception {
- DistributedLogManager dlm = getFactory().createDistributedLogManagerWithSharedClients(getStreamName());
+ DistributedLogManager dlm = getNamespace().openLog(getStreamName());
long totalCount = dlm.getLogRecordCount();
try {
AsyncLogReader reader;
@@ -1536,7 +1536,7 @@ public class DistributedLogTool extends Tool {
@Override
protected int runCmd() throws Exception {
- DistributedLogManager dlm = getFactory().createDistributedLogManagerWithSharedClients(getStreamName());
+ DistributedLogManager dlm = getNamespace().openLog(getStreamName());
try {
return inspectAndRepair(dlm.getLogSegments());
} finally {
@@ -2640,11 +2640,11 @@ public class DistributedLogTool extends Tool {
@Override
protected int runCmd() throws Exception {
getConf().setZkAclId(getZkAclId());
- return truncateStream(getFactory(), getStreamName(), dlsn);
+ return truncateStream(getNamespace(), getStreamName(), dlsn);
}
- private int truncateStream(final com.twitter.distributedlog.DistributedLogManagerFactory factory, String streamName, DLSN dlsn) throws Exception {
- DistributedLogManager dlm = factory.createDistributedLogManagerWithSharedClients(streamName);
+ private int truncateStream(final DistributedLogNamespace namespace, String streamName, DLSN dlsn) throws Exception {
+ DistributedLogManager dlm = namespace.openLog(streamName);
try {
long totalRecords = dlm.getLogRecordCount();
long recordsAfterTruncate = Await.result(dlm.getLogRecordCountAsync(dlsn));
@@ -2731,7 +2731,6 @@ public class DistributedLogTool extends Tool {
int numThreads = 1;
String streamPrefix = null;
String subscriberId = null;
- AtomicInteger streamIndex = new AtomicInteger();
DeleteSubscriberCommand() {
super("delete_subscriber", "Delete the subscriber in subscription store. ");
@@ -2764,20 +2763,21 @@ public class DistributedLogTool extends Tool {
@Override
protected int runCmd() throws Exception {
getConf().setZkAclId(getZkAclId());
- return deleteSubscriber(getFactory());
+ return deleteSubscriber(getNamespace());
}
- private int deleteSubscriber(final com.twitter.distributedlog.DistributedLogManagerFactory factory) throws Exception {
- Collection<String> streamCollection = factory.enumerateAllLogsInNamespace();
+ private int deleteSubscriber(final DistributedLogNamespace namespace) throws Exception {
+ Iterator<String> streamCollection = namespace.getLogs();
final List<String> streams = new ArrayList<String>();
- if (null != streamPrefix) {
- for (String s : streamCollection) {
+ while (streamCollection.hasNext()) {
+ String s = streamCollection.next();
+ if (null != streamPrefix) {
if (s.startsWith(streamPrefix)) {
streams.add(s);
}
+ } else {
+ streams.add(s);
}
- } else {
- streams.addAll(streamCollection);
}
if (0 == streams.size()) {
return 0;
@@ -2796,7 +2796,7 @@ public class DistributedLogTool extends Tool {
@Override
public void run() {
try {
- deleteSubscriber(factory, streams, tid, numStreamsPerThreads);
+ deleteSubscriber(namespace, streams, tid, numStreamsPerThreads);
System.out.println("Thread " + tid + " finished.");
} catch (Exception e) {
System.err.println("Thread " + tid + " quits with exception : " + e.getMessage());
@@ -2811,14 +2811,13 @@ public class DistributedLogTool extends Tool {
return 0;
}
- private void deleteSubscriber(com.twitter.distributedlog.DistributedLogManagerFactory factory, List<String> streams,
+ private void deleteSubscriber(DistributedLogNamespace namespace, List<String> streams,
int tid, int numStreamsPerThreads) throws Exception {
int startIdx = tid * numStreamsPerThreads;
int endIdx = Math.min(streams.size(), (tid + 1) * numStreamsPerThreads);
for (int i = startIdx; i < endIdx; i++) {
final String s = streams.get(i);
- DistributedLogManager dlm =
- factory.createDistributedLogManagerWithSharedClients(s);
+ DistributedLogManager dlm = namespace.openLog(s);
final CountDownLatch countDownLatch = new CountDownLatch(1);
dlm.getSubscriptionsStore().deleteSubscriber(subscriberId)
.addEventListener(new FutureEventListener<Boolean>() {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
index 63db1fe..2f9e091 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
@@ -17,34 +17,27 @@
*/
package com.twitter.distributedlog.util;
+import com.google.common.base.Objects;
+import com.twitter.distributedlog.DistributedLogConstants;
import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.StringUtils;
+import java.net.InetAddress;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.List;
import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
/**
* Utilities about DL implementations like uri, log segments, metadata serialization and deserialization.
*/
public class DLUtils {
- static final Logger logger = LoggerFactory.getLogger(DLUtils.class);
-
- /**
- * Extract zk servers fro dl <i>uri</i>.
- *
- * @param uri
- * dl uri
- * @return zk servers
- */
- public static String getZKServersFromDLUri(URI uri) {
- return uri.getAuthority().replace(";", ",");
- }
-
/**
* Find the log segment whose transaction ids are not less than provided <code>transactionId</code>.
*
@@ -224,4 +217,105 @@ public class DLUtils {
public static long bytes2LogSegmentId(byte[] data) {
return Long.parseLong(new String(data, UTF_8));
}
+
+ /**
+ * Normalize the uri.
+ *
+ * @param uri the distributedlog uri.
+ * @return the normalized uri
+ */
+ public static URI normalizeURI(URI uri) {
+ checkNotNull(uri, "DistributedLog uri is null");
+ String scheme = uri.getScheme();
+ checkNotNull(scheme, "Invalid distributedlog uri : " + uri);
+ scheme = scheme.toLowerCase();
+ String[] schemeParts = StringUtils.split(scheme, '-');
+ checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()),
+ "Unknown distributedlog scheme found : " + uri);
+ URI normalizedUri;
+ try {
+ normalizedUri = new URI(
+ schemeParts[0], // remove backend info
+ uri.getAuthority(),
+ uri.getPath(),
+ uri.getQuery(),
+ uri.getFragment());
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Invalid distributedlog uri found : " + uri, e);
+ }
+ return normalizedUri;
+ }
+
+ private static String getHostIpLockClientId() {
+ try {
+ return InetAddress.getLocalHost().toString();
+ } catch(Exception ex) {
+ return DistributedLogConstants.UNKNOWN_CLIENT_ID;
+ }
+ }
+
+ /**
+ * Normalize the client id.
+ *
+ * @return the normalized client id.
+ */
+ public static String normalizeClientId(String clientId) {
+ String normalizedClientId;
+ if (clientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) {
+ normalizedClientId = getHostIpLockClientId();
+ } else {
+ normalizedClientId = clientId;
+ }
+ return normalizedClientId;
+ }
+
+ /**
+ * Is it a reserved stream name in bkdl namespace?
+ *
+ * @param name
+ * stream name
+ * @return true if it is reserved name, otherwise false.
+ */
+ public static boolean isReservedStreamName(String name) {
+ return name.startsWith(".");
+ }
+
+ /**
+ * Validate the stream name.
+ *
+ * @param nameOfStream
+ * name of stream
+ * @throws InvalidStreamNameException
+ */
+ public static void validateName(String nameOfStream)
+ throws InvalidStreamNameException {
+ String reason = null;
+ char chars[] = nameOfStream.toCharArray();
+ char c;
+ // validate the stream to see if meet zookeeper path's requirement
+ for (int i = 0; i < chars.length; i++) {
+ c = chars[i];
+
+ if (c == 0) {
+ reason = "null character not allowed @" + i;
+ break;
+ } else if (c == '/') {
+ reason = "'/' not allowed @" + i;
+ break;
+ } else if (c > '\u0000' && c < '\u001f'
+ || c > '\u007f' && c < '\u009F'
+ || c > '\ud800' && c < '\uf8ff'
+ || c > '\ufff0' && c < '\uffff') {
+ reason = "invalid charater @" + i;
+ break;
+ }
+ }
+ if (null != reason) {
+ throw new InvalidStreamNameException(nameOfStream, reason);
+ }
+ if (isReservedStreamName(nameOfStream)) {
+ throw new InvalidStreamNameException(nameOfStream,
+ "Stream Name is reserved");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
index 266409e..f206a25 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
@@ -271,7 +271,7 @@ public class FutureUtils {
*
* @param throwable the cause of the exception
* @return the bk exception return code. if the exception isn't bk exceptions,
- * it would return bk exception code.
+ * it would return {@link BKException.Code#UnexpectedConditionException}.
*/
public static int bkResultCode(Throwable throwable) {
if (throwable instanceof BKException) {
@@ -455,13 +455,13 @@ public class FutureUtils {
* @param key submit key of the ordered scheduler
*/
public static <T> void setException(final Promise<T> promise,
- final Throwable throwable,
+ final Throwable cause,
OrderedScheduler scheduler,
Object key) {
scheduler.submit(key, new Runnable() {
@Override
public void run() {
- setException(promise, throwable);
+ setException(promise, cause);
}
});
}