You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:27 UTC
[22/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
deleted file mode 100644
index 91e6dec..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.logsegment;
-
-import com.twitter.distributedlog.BookKeeperClient;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.bk.DynamicQuorumConfigProvider;
-import com.twitter.distributedlog.bk.LedgerAllocator;
-import com.twitter.distributedlog.bk.LedgerAllocatorDelegator;
-import com.twitter.distributedlog.bk.QuorumConfigProvider;
-import com.twitter.distributedlog.bk.SimpleLedgerAllocator;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.BKTransmitException;
-import com.twitter.distributedlog.injector.AsyncFailureInjector;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryReader;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
-import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
-import com.twitter.distributedlog.metadata.LogMetadataForWriter;
-import com.twitter.distributedlog.util.Allocator;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.AsyncCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * BookKeeper Based Entry Store
- */
-public class BKLogSegmentEntryStore implements
- LogSegmentEntryStore,
- AsyncCallback.OpenCallback,
- AsyncCallback.DeleteCallback {
-
- private static final Logger logger = LoggerFactory.getLogger(BKLogSegmentEntryReader.class);
-
- private static class OpenReaderRequest {
-
- private final LogSegmentMetadata segment;
- private final long startEntryId;
- private final Promise<LogSegmentEntryReader> openPromise;
-
- OpenReaderRequest(LogSegmentMetadata segment,
- long startEntryId) {
- this.segment = segment;
- this.startEntryId = startEntryId;
- this.openPromise = new Promise<LogSegmentEntryReader>();
- }
-
- }
-
- private static class DeleteLogSegmentRequest {
-
- private final LogSegmentMetadata segment;
- private final Promise<LogSegmentMetadata> deletePromise;
-
- DeleteLogSegmentRequest(LogSegmentMetadata segment) {
- this.segment = segment;
- this.deletePromise = new Promise<LogSegmentMetadata>();
- }
-
- }
-
- private final byte[] passwd;
- private final ZooKeeperClient zkc;
- private final BookKeeperClient bkc;
- private final OrderedScheduler scheduler;
- private final DistributedLogConfiguration conf;
- private final DynamicDistributedLogConfiguration dynConf;
- private final StatsLogger statsLogger;
- private final AsyncFailureInjector failureInjector;
- // ledger allocator
- private final LedgerAllocator allocator;
-
- public BKLogSegmentEntryStore(DistributedLogConfiguration conf,
- DynamicDistributedLogConfiguration dynConf,
- ZooKeeperClient zkc,
- BookKeeperClient bkc,
- OrderedScheduler scheduler,
- LedgerAllocator allocator,
- StatsLogger statsLogger,
- AsyncFailureInjector failureInjector) {
- this.conf = conf;
- this.dynConf = dynConf;
- this.zkc = zkc;
- this.bkc = bkc;
- this.passwd = conf.getBKDigestPW().getBytes(UTF_8);
- this.scheduler = scheduler;
- this.allocator = allocator;
- this.statsLogger = statsLogger;
- this.failureInjector = failureInjector;
- }
-
- @Override
- public Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment) {
- DeleteLogSegmentRequest request = new DeleteLogSegmentRequest(segment);
- BookKeeper bk;
- try {
- bk = this.bkc.get();
- } catch (IOException e) {
- return Future.exception(e);
- }
- bk.asyncDeleteLedger(segment.getLogSegmentId(), this, request);
- return request.deletePromise;
- }
-
- @Override
- public void deleteComplete(int rc, Object ctx) {
- DeleteLogSegmentRequest deleteRequest = (DeleteLogSegmentRequest) ctx;
- if (BKException.Code.NoSuchLedgerExistsException == rc) {
- logger.warn("No ledger {} found to delete for {}.",
- deleteRequest.segment.getLogSegmentId(), deleteRequest.segment);
- } else if (BKException.Code.OK != rc) {
- logger.error("Couldn't delete ledger {} from bookkeeper for {} : {}",
- new Object[]{ deleteRequest.segment.getLogSegmentId(), deleteRequest.segment,
- BKException.getMessage(rc) });
- FutureUtils.setException(deleteRequest.deletePromise,
- new BKTransmitException("Couldn't delete log segment " + deleteRequest.segment, rc));
- return;
- }
- FutureUtils.setValue(deleteRequest.deletePromise, deleteRequest.segment);
- }
-
- //
- // Writers
- //
-
- LedgerAllocator createLedgerAllocator(LogMetadataForWriter logMetadata,
- DynamicDistributedLogConfiguration dynConf)
- throws IOException {
- LedgerAllocator ledgerAllocatorDelegator;
- if (null == allocator || !dynConf.getEnableLedgerAllocatorPool()) {
- QuorumConfigProvider quorumConfigProvider =
- new DynamicQuorumConfigProvider(dynConf);
- LedgerAllocator allocator = new SimpleLedgerAllocator(
- logMetadata.getAllocationPath(),
- logMetadata.getAllocationData(),
- quorumConfigProvider,
- zkc,
- bkc);
- ledgerAllocatorDelegator = new LedgerAllocatorDelegator(allocator, true);
- } else {
- ledgerAllocatorDelegator = allocator;
- }
- return ledgerAllocatorDelegator;
- }
-
- @Override
- public Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator(
- LogMetadataForWriter logMetadata,
- DynamicDistributedLogConfiguration dynConf) throws IOException {
- // Build the ledger allocator
- LedgerAllocator allocator = createLedgerAllocator(logMetadata, dynConf);
- return new BKLogSegmentAllocator(allocator);
- }
-
- //
- // Readers
- //
-
- @Override
- public Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
- long startEntryId) {
- BookKeeper bk;
- try {
- bk = this.bkc.get();
- } catch (IOException e) {
- return Future.exception(e);
- }
- OpenReaderRequest request = new OpenReaderRequest(segment, startEntryId);
- if (segment.isInProgress()) {
- bk.asyncOpenLedgerNoRecovery(
- segment.getLogSegmentId(),
- BookKeeper.DigestType.CRC32,
- passwd,
- this,
- request);
- } else {
- bk.asyncOpenLedger(
- segment.getLogSegmentId(),
- BookKeeper.DigestType.CRC32,
- passwd,
- this,
- request);
- }
- return request.openPromise;
- }
-
- @Override
- public void openComplete(int rc, LedgerHandle lh, Object ctx) {
- OpenReaderRequest request = (OpenReaderRequest) ctx;
- if (BKException.Code.OK != rc) {
- FutureUtils.setException(
- request.openPromise,
- new BKTransmitException("Failed to open ledger handle for log segment " + request.segment, rc));
- return;
- }
- // successfully open a ledger
- try {
- LogSegmentEntryReader reader = new BKLogSegmentEntryReader(
- request.segment,
- lh,
- request.startEntryId,
- bkc.get(),
- scheduler,
- conf,
- statsLogger,
- failureInjector);
- FutureUtils.setValue(request.openPromise, reader);
- } catch (IOException e) {
- FutureUtils.setException(request.openPromise, e);
- }
-
- }
-
- @Override
- public Future<LogSegmentRandomAccessEntryReader> openRandomAccessReader(final LogSegmentMetadata segment,
- final boolean fence) {
- final BookKeeper bk;
- try {
- bk = this.bkc.get();
- } catch (IOException e) {
- return Future.exception(e);
- }
- final Promise<LogSegmentRandomAccessEntryReader> openPromise = new Promise<LogSegmentRandomAccessEntryReader>();
- AsyncCallback.OpenCallback openCallback = new AsyncCallback.OpenCallback() {
- @Override
- public void openComplete(int rc, LedgerHandle lh, Object ctx) {
- if (BKException.Code.OK != rc) {
- FutureUtils.setException(
- openPromise,
- new BKTransmitException("Failed to open ledger handle for log segment " + segment, rc));
- return;
- }
- LogSegmentRandomAccessEntryReader reader = new BKLogSegmentRandomAccessEntryReader(
- segment,
- lh,
- conf);
- FutureUtils.setValue(openPromise, reader);
- }
- };
- if (segment.isInProgress() && !fence) {
- bk.asyncOpenLedgerNoRecovery(
- segment.getLogSegmentId(),
- BookKeeper.DigestType.CRC32,
- passwd,
- openCallback,
- null);
- } else {
- bk.asyncOpenLedger(
- segment.getLogSegmentId(),
- BookKeeper.DigestType.CRC32,
- passwd,
- openCallback,
- null);
- }
- return openPromise;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
deleted file mode 100644
index 34fe1c3..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.logsegment;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
-import org.apache.bookkeeper.client.AsyncCallback;
-import org.apache.bookkeeper.client.LedgerHandle;
-
-/**
- * Ledger based log segment entry writer.
- */
-public class BKLogSegmentEntryWriter implements LogSegmentEntryWriter {
-
- private final LedgerHandle lh;
-
- public BKLogSegmentEntryWriter(LedgerHandle lh) {
- this.lh = lh;
- }
-
- @VisibleForTesting
- public LedgerHandle getLedgerHandle() {
- return this.lh;
- }
-
- @Override
- public long getLogSegmentId() {
- return lh.getId();
- }
-
- @Override
- public void asyncClose(AsyncCallback.CloseCallback callback, Object ctx) {
- lh.asyncClose(callback, ctx);
- }
-
- @Override
- public void asyncAddEntry(byte[] data, int offset, int length,
- AsyncCallback.AddCallback callback, Object ctx) {
- lh.asyncAddEntry(data, offset, length, callback, ctx);
- }
-
- @Override
- public long size() {
- return lh.getLength();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
deleted file mode 100644
index 9cec80c..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.logsegment;
-
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.Entry;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.exceptions.BKTransmitException;
-import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-
-import java.io.IOException;
-import java.util.Enumeration;
-import java.util.List;
-
-/**
- * BookKeeper ledger based random access entry reader.
- */
-class BKLogSegmentRandomAccessEntryReader implements
- LogSegmentRandomAccessEntryReader,
- ReadCallback {
-
- private final long lssn;
- private final long startSequenceId;
- private final boolean envelopeEntries;
- private final boolean deserializeRecordSet;
- // state
- private final LogSegmentMetadata metadata;
- private final LedgerHandle lh;
- private Promise<Void> closePromise = null;
-
- BKLogSegmentRandomAccessEntryReader(LogSegmentMetadata metadata,
- LedgerHandle lh,
- DistributedLogConfiguration conf) {
- this.metadata = metadata;
- this.lssn = metadata.getLogSegmentSequenceNumber();
- this.startSequenceId = metadata.getStartSequenceId();
- this.envelopeEntries = metadata.getEnvelopeEntries();
- this.deserializeRecordSet = conf.getDeserializeRecordSetOnReads();
- this.lh = lh;
- }
-
- @Override
- public long getLastAddConfirmed() {
- return lh.getLastAddConfirmed();
- }
-
- @Override
- public Future<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId) {
- Promise<List<Entry.Reader>> promise = new Promise<List<Entry.Reader>>();
- lh.asyncReadEntries(startEntryId, endEntryId, this, promise);
- return promise;
- }
-
- Entry.Reader processReadEntry(LedgerEntry entry) throws IOException {
- return Entry.newBuilder()
- .setLogSegmentInfo(lssn, startSequenceId)
- .setEntryId(entry.getEntryId())
- .setEnvelopeEntry(envelopeEntries)
- .deserializeRecordSet(deserializeRecordSet)
- .setInputStream(entry.getEntryInputStream())
- .buildReader();
- }
-
- @Override
- public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) {
- Promise<List<Entry.Reader>> promise = (Promise<List<Entry.Reader>>) ctx;
- if (BKException.Code.OK == rc) {
- List<Entry.Reader> entryList = Lists.newArrayList();
- while (entries.hasMoreElements()) {
- try {
- entryList.add(processReadEntry(entries.nextElement()));
- } catch (IOException ioe) {
- FutureUtils.setException(promise, ioe);
- return;
- }
- }
- FutureUtils.setValue(promise, entryList);
- } else {
- FutureUtils.setException(promise,
- new BKTransmitException("Failed to read entries :", rc));
- }
- }
-
- @Override
- public Future<Void> asyncClose() {
- final Promise<Void> closeFuture;
- synchronized (this) {
- if (null != closePromise) {
- return closePromise;
- }
- closeFuture = closePromise = new Promise<Void>();
- }
- BKUtils.closeLedgers(lh).proxyTo(closeFuture);
- return closeFuture;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java
deleted file mode 100644
index c71c67e..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.logsegment;
-
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.function.VoidFunctions;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.util.Future;
-import com.twitter.util.Futures;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.AsyncCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerHandle;
-
-import java.util.List;
-
-/**
- * BookKeeper Util Functions
- */
-public class BKUtils {
-
- /**
- * Close a ledger <i>lh</i>.
- *
- * @param lh ledger handle
- * @return future represents close result.
- */
- public static Future<Void> closeLedger(LedgerHandle lh) {
- final Promise<Void> closePromise = new Promise<Void>();
- lh.asyncClose(new AsyncCallback.CloseCallback() {
- @Override
- public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
- if (BKException.Code.OK != rc) {
- FutureUtils.setException(closePromise, BKException.create(rc));
- } else {
- FutureUtils.setValue(closePromise, null);
- }
- }
- }, null);
- return closePromise;
- }
-
- /**
- * Close a list of ledgers <i>lhs</i>.
- *
- * @param lhs a list of ledgers
- * @return future represents close results.
- */
- public static Future<Void> closeLedgers(LedgerHandle ... lhs) {
- List<Future<Void>> closeResults = Lists.newArrayListWithExpectedSize(lhs.length);
- for (LedgerHandle lh : lhs) {
- closeResults.add(closeLedger(lh));
- }
- return Futures.collect(closeResults).map(VoidFunctions.LIST_TO_VOID_FUNC);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java
deleted file mode 100644
index 3e859fb..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java
+++ /dev/null
@@ -1,400 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.metadata;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.impl.BKNamespaceDriver;
-import com.twitter.distributedlog.metadata.DLConfig;
-import com.twitter.distributedlog.thrift.BKDLConfigFormat;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.apache.thrift.transport.TMemoryBuffer;
-import org.apache.thrift.transport.TMemoryInputTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * Configurations for BookKeeper based DL.
- */
-public class BKDLConfig implements DLConfig {
-
- private static final Logger LOG = LoggerFactory.getLogger(BKDLConfig.class);
-
- private static final int BUFFER_SIZE = 4096;
- private static final ConcurrentMap<URI, DLConfig> cachedDLConfigs =
- new ConcurrentHashMap<URI, DLConfig>();
-
- public static void propagateConfiguration(BKDLConfig bkdlConfig, DistributedLogConfiguration dlConf) {
- dlConf.setEncodeRegionIDInLogSegmentMetadata(bkdlConfig.getEncodeRegionID());
- dlConf.setFirstLogSegmentSequenceNumber(bkdlConfig.getFirstLogSegmentSeqNo());
- if (bkdlConfig.isFederatedNamespace()) {
- dlConf.setCreateStreamIfNotExists(false);
- LOG.info("Disabled createIfNotExists for federated namespace.");
- }
- LOG.info("Propagate BKDLConfig to DLConfig : encodeRegionID = {}," +
- " firstLogSegmentSequenceNumber = {}, createStreamIfNotExists = {}, isFederated = {}.",
- new Object[] { dlConf.getEncodeRegionIDInLogSegmentMetadata(),
- dlConf.getFirstLogSegmentSequenceNumber(), dlConf.getCreateStreamIfNotExists(),
- bkdlConfig.isFederatedNamespace() });
- }
-
- public static BKDLConfig resolveDLConfig(ZooKeeperClient zkc, URI uri) throws IOException {
- DLConfig dlConfig = cachedDLConfigs.get(uri);
- if (dlConfig == null) {
- dlConfig = (new ZkMetadataResolver(zkc).resolve(uri)).getDLConfig();
- DLConfig oldDLConfig = cachedDLConfigs.putIfAbsent(uri, dlConfig);
- if (null != oldDLConfig) {
- dlConfig = oldDLConfig;
- }
- }
- assert (dlConfig instanceof BKDLConfig);
- return (BKDLConfig)dlConfig;
- }
-
- @VisibleForTesting
- public static void clearCachedDLConfigs() {
- cachedDLConfigs.clear();
- }
-
- private String bkZkServersForWriter;
- private String bkZkServersForReader;
- private String bkLedgersPath;
- private boolean sanityCheckTxnID = true;
- private boolean encodeRegionID = false;
- private String dlZkServersForWriter;
- private String dlZkServersForReader;
- private String aclRootPath;
- private Long firstLogSegmentSeqNo;
- private boolean isFederatedNamespace = false;
-
- /**
- * Construct a empty config with given <i>uri</i>.
- */
- public BKDLConfig(URI uri) {
- this(BKNamespaceDriver.getZKServersFromDLUri(uri),
- BKNamespaceDriver.getZKServersFromDLUri(uri),
- null, null, null);
- }
-
- /**
- * The caller should make sure both dl and bk use same zookeeper server.
- *
- * @param zkServers
- * zk servers used for both dl and bk.
- * @param ledgersPath
- * ledgers path.
- */
- @VisibleForTesting
- public BKDLConfig(String zkServers, String ledgersPath) {
- this(zkServers, zkServers, zkServers, zkServers, ledgersPath);
- }
-
- public BKDLConfig(String dlZkServersForWriter,
- String dlZkServersForReader,
- String bkZkServersForWriter,
- String bkZkServersForReader,
- String bkLedgersPath) {
- this.dlZkServersForWriter = dlZkServersForWriter;
- this.dlZkServersForReader = dlZkServersForReader;
- this.bkZkServersForWriter = bkZkServersForWriter;
- this.bkZkServersForReader = bkZkServersForReader;
- this.bkLedgersPath = bkLedgersPath;
- }
-
- /**
- * @return zk servers used for bk for writers
- */
- public String getBkZkServersForWriter() {
- return bkZkServersForWriter;
- }
-
- /**
- * @return zk servers used for bk for readers
- */
- public String getBkZkServersForReader() {
- return bkZkServersForReader;
- }
-
- /**
- * @return zk servers used for dl for writers
- */
- public String getDlZkServersForWriter() {
- return dlZkServersForWriter;
- }
-
- /**
- * @return zk servers used for dl for readers
- */
- public String getDlZkServersForReader() {
- return dlZkServersForReader;
- }
-
- /**
- * @return ledgers path for bk
- */
- public String getBkLedgersPath() {
- return bkLedgersPath;
- }
-
- /**
- * Enable/Disable sanity check txn id.
- *
- * @param enabled
- * flag to enable/disable sanity check txn id.
- * @return bk dl config.
- */
- public BKDLConfig setSanityCheckTxnID(boolean enabled) {
- this.sanityCheckTxnID = enabled;
- return this;
- }
-
- /**
- * @return flag to sanity check highest txn id.
- */
- public boolean getSanityCheckTxnID() {
- return sanityCheckTxnID;
- }
-
- /**
- * Enable/Disable encode region id.
- *
- * @param enabled
- * flag to enable/disable encoding region id.
- * @return bk dl config
- */
- public BKDLConfig setEncodeRegionID(boolean enabled) {
- this.encodeRegionID = enabled;
- return this;
- }
-
- /**
- * @return flag to encode region id.
- */
- public boolean getEncodeRegionID() {
- return encodeRegionID;
- }
-
- /**
- * Set the root path of zk based ACL manager.
- *
- * @param aclRootPath
- * root path of zk based ACL manager.
- * @return bk dl config
- */
- public BKDLConfig setACLRootPath(String aclRootPath) {
- this.aclRootPath = aclRootPath;
- return this;
- }
-
- /**
- * Get the root path of zk based ACL manager.
- *
- * @return root path of zk based ACL manager.
- */
- public String getACLRootPath() {
- return aclRootPath;
- }
-
- /**
- * Set the value at which ledger sequence number should start for streams that are being
- * upgraded and did not have ledger sequence number to start with or for newly created
- * streams
- *
- * @param firstLogSegmentSeqNo first ledger sequence number
- * @return bk dl config
- */
- public BKDLConfig setFirstLogSegmentSeqNo(long firstLogSegmentSeqNo) {
- this.firstLogSegmentSeqNo = firstLogSegmentSeqNo;
- return this;
- }
-
- /**
- * Get the value at which ledger sequence number should start for streams that are being
- * upgraded and did not have ledger sequence number to start with or for newly created
- * streams
- *
- * @return first ledger sequence number
- */
- public Long getFirstLogSegmentSeqNo() {
- if (null == firstLogSegmentSeqNo) {
- return DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO;
- }
- return firstLogSegmentSeqNo;
- }
-
- /**
- * Set the namespace to federated <i>isFederatedNamespace</i>.
- *
- * @param isFederatedNamespace
- * is the namespace federated?
- * @return bk dl config
- */
- public BKDLConfig setFederatedNamespace(boolean isFederatedNamespace) {
- this.isFederatedNamespace = isFederatedNamespace;
- return this;
- }
-
- /**
- * Whether the namespace is federated namespace
- *
- * @return true if the namespace is a federated namespace. otherwise false.
- */
- public boolean isFederatedNamespace() {
- return this.isFederatedNamespace;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(bkZkServersForWriter, bkZkServersForReader,
- dlZkServersForWriter, dlZkServersForReader,
- bkLedgersPath);
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof BKDLConfig)) {
- return false;
- }
- BKDLConfig another = (BKDLConfig) o;
- return Objects.equal(bkZkServersForWriter, another.bkZkServersForWriter) &&
- Objects.equal(bkZkServersForReader, another.bkZkServersForReader) &&
- Objects.equal(dlZkServersForWriter, another.dlZkServersForWriter) &&
- Objects.equal(dlZkServersForReader, another.dlZkServersForReader) &&
- Objects.equal(bkLedgersPath, another.bkLedgersPath) &&
- sanityCheckTxnID == another.sanityCheckTxnID &&
- encodeRegionID == another.encodeRegionID &&
- Objects.equal(aclRootPath, another.aclRootPath) &&
- Objects.equal(firstLogSegmentSeqNo, another.firstLogSegmentSeqNo) &&
- Objects.equal(isFederatedNamespace, another.isFederatedNamespace);
-
- }
-
- @Override
- public String toString() {
- return serialize();
- }
-
- @Override
- public String serialize() {
- BKDLConfigFormat configFormat = new BKDLConfigFormat();
- if (null != bkZkServersForWriter) {
- configFormat.setBkZkServers(bkZkServersForWriter);
- }
- if (null != bkZkServersForReader) {
- configFormat.setBkZkServersForReader(bkZkServersForReader);
- }
- if (null != dlZkServersForWriter) {
- configFormat.setDlZkServersForWriter(dlZkServersForWriter);
- }
- if (null != dlZkServersForReader) {
- configFormat.setDlZkServersForReader(dlZkServersForReader);
- }
- if (null != bkLedgersPath) {
- configFormat.setBkLedgersPath(bkLedgersPath);
- }
- configFormat.setSanityCheckTxnID(sanityCheckTxnID);
- configFormat.setEncodeRegionID(encodeRegionID);
- if (null != aclRootPath) {
- configFormat.setAclRootPath(aclRootPath);
- }
- if (null != firstLogSegmentSeqNo) {
- configFormat.setFirstLogSegmentSeqNo(firstLogSegmentSeqNo);
- }
- if (isFederatedNamespace) {
- configFormat.setFederatedNamespace(true);
- }
- return serialize(configFormat);
- }
-
- String serialize(BKDLConfigFormat configFormat) {
- TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- configFormat.write(protocol);
- transport.flush();
- return transport.toString("UTF-8");
- } catch (TException e) {
- throw new RuntimeException("Failed to serialize BKDLConfig : ", e);
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException("Failed to serialize BKDLConfig : ", e);
- }
- }
-
- @Override
- public void deserialize(byte[] data) throws IOException {
- BKDLConfigFormat configFormat = new BKDLConfigFormat();
- TMemoryInputTransport transport = new TMemoryInputTransport(data);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- configFormat.read(protocol);
- } catch (TException e) {
- throw new IOException("Failed to deserialize data '" +
- new String(data, UTF_8) + "' : ", e);
- }
- // bookkeeper cluster settings
- if (configFormat.isSetBkZkServers()) {
- bkZkServersForWriter = configFormat.getBkZkServers();
- }
- if (configFormat.isSetBkZkServersForReader()) {
- bkZkServersForReader = configFormat.getBkZkServersForReader();
- } else {
- bkZkServersForReader = bkZkServersForWriter;
- }
- if (configFormat.isSetBkLedgersPath()) {
- bkLedgersPath = configFormat.getBkLedgersPath();
- }
- // dl zookeeper cluster settings
- if (configFormat.isSetDlZkServersForWriter()) {
- dlZkServersForWriter = configFormat.getDlZkServersForWriter();
- }
- if (configFormat.isSetDlZkServersForReader()) {
- dlZkServersForReader = configFormat.getDlZkServersForReader();
- } else {
- dlZkServersForReader = dlZkServersForWriter;
- }
- // dl settings
- sanityCheckTxnID = !configFormat.isSetSanityCheckTxnID() || configFormat.isSanityCheckTxnID();
- encodeRegionID = configFormat.isSetEncodeRegionID() && configFormat.isEncodeRegionID();
- if (configFormat.isSetAclRootPath()) {
- aclRootPath = configFormat.getAclRootPath();
- }
-
- if (configFormat.isSetFirstLogSegmentSeqNo()) {
- firstLogSegmentSeqNo = configFormat.getFirstLogSegmentSeqNo();
- }
- isFederatedNamespace = configFormat.isSetFederatedNamespace() && configFormat.isFederatedNamespace();
-
- // Validate the settings
- if (null == bkZkServersForWriter || null == bkZkServersForReader || null == bkLedgersPath ||
- null == dlZkServersForWriter || null == dlZkServersForReader) {
- throw new IOException("Missing zk/bk settings in BKDL Config : " + new String(data, UTF_8));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
deleted file mode 100644
index c76a5a5..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ /dev/null
@@ -1,633 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.metadata;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
-import com.twitter.distributedlog.exceptions.LockCancelledException;
-import com.twitter.distributedlog.exceptions.LogExistsException;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
-import com.twitter.distributedlog.lock.DistributedLock;
-import com.twitter.distributedlog.lock.SessionLockFactory;
-import com.twitter.distributedlog.lock.ZKDistributedLock;
-import com.twitter.distributedlog.lock.ZKSessionLockFactory;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
-import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
-import com.twitter.distributedlog.metadata.LogMetadata;
-import com.twitter.distributedlog.metadata.LogMetadataForReader;
-import com.twitter.distributedlog.metadata.LogMetadataForWriter;
-import com.twitter.distributedlog.util.DLUtils;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.SchedulerUtils;
-import com.twitter.distributedlog.zk.LimitedPermitManager;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.PermitManager;
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.distributedlog.zk.ZKTransaction;
-import com.twitter.util.ExceptionalFunction;
-import com.twitter.util.ExceptionalFunction0;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static com.twitter.distributedlog.metadata.LogMetadata.*;
-
-/**
- * zookeeper based {@link LogStreamMetadataStore}
- */
-public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
-
- private final static Logger LOG = LoggerFactory.getLogger(ZKLogStreamMetadataStore.class);
-
- private final String clientId;
- private final DistributedLogConfiguration conf;
- private final ZooKeeperClient zooKeeperClient;
- private final OrderedScheduler scheduler;
- private final StatsLogger statsLogger;
- private final LogSegmentMetadataStore logSegmentStore;
- private final LimitedPermitManager permitManager;
- // lock
- private SessionLockFactory lockFactory;
- private OrderedScheduler lockStateExecutor;
-
- public ZKLogStreamMetadataStore(String clientId,
- DistributedLogConfiguration conf,
- ZooKeeperClient zkc,
- OrderedScheduler scheduler,
- StatsLogger statsLogger) {
- this.clientId = clientId;
- this.conf = conf;
- this.zooKeeperClient = zkc;
- this.scheduler = scheduler;
- this.statsLogger = statsLogger;
- // create the log segment metadata store and the permit manager (used for log segment rolling)
- this.logSegmentStore = new ZKLogSegmentMetadataStore(conf, zooKeeperClient, scheduler);
- this.permitManager = new LimitedPermitManager(
- conf.getLogSegmentRollingConcurrency(),
- 1,
- TimeUnit.MINUTES,
- scheduler);
- this.zooKeeperClient.register(permitManager);
- }
-
- private synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
- if (createIfNull && null == lockStateExecutor) {
- StatsLogger lockStateStatsLogger = statsLogger.scope("lock_scheduler");
- lockStateExecutor = OrderedScheduler.newBuilder()
- .name("DLM-LockState")
- .corePoolSize(conf.getNumLockStateThreads())
- .statsLogger(lockStateStatsLogger)
- .perExecutorStatsLogger(lockStateStatsLogger)
- .traceTaskExecution(conf.getEnableTaskExecutionStats())
- .traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros())
- .build();
- }
- return lockStateExecutor;
- }
-
- private synchronized SessionLockFactory getLockFactory(boolean createIfNull) {
- if (createIfNull && null == lockFactory) {
- lockFactory = new ZKSessionLockFactory(
- zooKeeperClient,
- clientId,
- getLockStateExecutor(createIfNull),
- conf.getZKNumRetries(),
- conf.getLockTimeoutMilliSeconds(),
- conf.getZKRetryBackoffStartMillis(),
- statsLogger);
- }
- return lockFactory;
- }
-
- @Override
- public void close() throws IOException {
- this.zooKeeperClient.unregister(permitManager);
- this.permitManager.close();
- this.logSegmentStore.close();
- SchedulerUtils.shutdownScheduler(
- getLockStateExecutor(false),
- conf.getSchedulerShutdownTimeoutMs(),
- TimeUnit.MILLISECONDS);
- }
-
- @Override
- public LogSegmentMetadataStore getLogSegmentMetadataStore() {
- return logSegmentStore;
- }
-
- @Override
- public PermitManager getPermitManager() {
- return this.permitManager;
- }
-
- @Override
- public Transaction<Object> newTransaction() {
- return new ZKTransaction(zooKeeperClient);
- }
-
- @Override
- public Future<Void> logExists(URI uri, final String logName) {
- final String logSegmentsPath = LogMetadata.getLogSegmentsPath(
- uri, logName, conf.getUnpartitionedStreamName());
- final Promise<Void> promise = new Promise<Void>();
- try {
- final ZooKeeper zk = zooKeeperClient.get();
- zk.sync(logSegmentsPath, new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int syncRc, String path, Object syncCtx) {
- if (KeeperException.Code.NONODE.intValue() == syncRc) {
- promise.setException(new LogNotFoundException(
- String.format("Log %s does not exist or has been deleted", logName)));
- return;
- } else if (KeeperException.Code.OK.intValue() != syncRc){
- promise.setException(new ZKException("Error on checking log existence for " + logName,
- KeeperException.create(KeeperException.Code.get(syncRc))));
- return;
- }
- zk.exists(logSegmentsPath, false, new AsyncCallback.StatCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- if (KeeperException.Code.OK.intValue() == rc) {
- promise.setValue(null);
- } else if (KeeperException.Code.NONODE.intValue() == rc) {
- promise.setException(new LogNotFoundException(
- String.format("Log %s does not exist or has been deleted", logName)));
- } else {
- promise.setException(new ZKException("Error on checking log existence for " + logName,
- KeeperException.create(KeeperException.Code.get(rc))));
- }
- }
- }, null);
- }
- }, null);
-
- } catch (InterruptedException ie) {
- LOG.error("Interrupted while reading {}", logSegmentsPath, ie);
- promise.setException(new DLInterruptedException("Interrupted while checking "
- + logSegmentsPath, ie));
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
- }
- return promise;
- }
-
- //
- // Create Write Lock
- //
-
- @Override
- public DistributedLock createWriteLock(LogMetadataForWriter metadata) {
- return new ZKDistributedLock(
- getLockStateExecutor(true),
- getLockFactory(true),
- metadata.getLockPath(),
- conf.getLockTimeoutMilliSeconds(),
- statsLogger);
- }
-
- //
- // Create Read Lock
- //
-
- private Future<Void> ensureReadLockPathExist(final LogMetadata logMetadata,
- final String readLockPath) {
- final Promise<Void> promise = new Promise<Void>();
- promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Throwable t) {
- FutureUtils.setException(promise, new LockCancelledException(readLockPath,
- "Could not ensure read lock path", t));
- return null;
- }
- });
- Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath());
- Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate,
- new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT,
- new org.apache.zookeeper.AsyncCallback.StringCallback() {
- @Override
- public void processResult(final int rc, final String path, Object ctx, String name) {
- if (KeeperException.Code.NONODE.intValue() == rc) {
- FutureUtils.setException(promise, new LogNotFoundException(
- String.format("Log %s does not exist or has been deleted",
- logMetadata.getFullyQualifiedName())));
- } else if (KeeperException.Code.OK.intValue() == rc) {
- FutureUtils.setValue(promise, null);
- LOG.trace("Created path {}.", path);
- } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
- FutureUtils.setValue(promise, null);
- LOG.trace("Path {} is already existed.", path);
- } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
- FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path));
- } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
- FutureUtils.setException(promise, new DLInterruptedException(path));
- } else {
- FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc)));
- }
- }
- }, null);
- return promise;
- }
-
- @Override
- public Future<DistributedLock> createReadLock(final LogMetadataForReader metadata,
- Optional<String> readerId) {
- final String readLockPath = metadata.getReadLockPath(readerId);
- return ensureReadLockPathExist(metadata, readLockPath).flatMap(
- new ExceptionalFunction<Void, Future<DistributedLock>>() {
- @Override
- public Future<DistributedLock> applyE(Void value) throws Throwable {
- // Unfortunately this has a blocking call which we should not execute on the
- // ZK completion thread
- return scheduler.apply(new ExceptionalFunction0<DistributedLock>() {
- @Override
- public DistributedLock applyE() throws Throwable {
- return new ZKDistributedLock(
- getLockStateExecutor(true),
- getLockFactory(true),
- readLockPath,
- conf.getLockTimeoutMilliSeconds(),
- statsLogger.scope("read_lock"));
- }
- });
- }
- });
- }
-
- //
- // Create Log
- //
-
- static class MetadataIndex {
- static final int LOG_ROOT_PARENT = 0;
- static final int LOG_ROOT = 1;
- static final int MAX_TXID = 2;
- static final int VERSION = 3;
- static final int LOCK = 4;
- static final int READ_LOCK = 5;
- static final int LOGSEGMENTS = 6;
- static final int ALLOCATION = 7;
- }
-
- static int bytesToInt(byte[] b) {
- assert b.length >= 4;
- return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
- }
-
- static byte[] intToBytes(int i) {
- return new byte[]{
- (byte) (i >> 24),
- (byte) (i >> 16),
- (byte) (i >> 8),
- (byte) (i)};
- }
-
- static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
- String logRootPath,
- boolean ownAllocator) {
- // Note re. persistent lock state initialization: the read lock persistent state (path) is
- // initialized here but only used in the read handler. The reason is its more convenient and
- // less error prone to manage all stream structure in one place.
- final String logRootParentPath = new File(logRootPath).getParent();
- final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
- final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
- final String lockPath = logRootPath + LOCK_PATH;
- final String readLockPath = logRootPath + READ_LOCK_PATH;
- final String versionPath = logRootPath + VERSION_PATH;
- final String allocationPath = logRootPath + ALLOCATION_PATH;
-
- int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
- List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
- checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
- checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
- checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
- checkFutures.add(Utils.zkGetData(zk, versionPath, false));
- checkFutures.add(Utils.zkGetData(zk, lockPath, false));
- checkFutures.add(Utils.zkGetData(zk, readLockPath, false));
- checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false));
- if (ownAllocator) {
- checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
- }
-
- return Future.collect(checkFutures);
- }
-
- static boolean pathExists(Versioned<byte[]> metadata) {
- return null != metadata.getValue() && null != metadata.getVersion();
- }
-
- static void ensureMetadataExist(Versioned<byte[]> metadata) {
- Preconditions.checkNotNull(metadata.getValue());
- Preconditions.checkNotNull(metadata.getVersion());
- }
-
- static void createMissingMetadata(final ZooKeeper zk,
- final String logRootPath,
- final List<Versioned<byte[]>> metadatas,
- final List<ACL> acl,
- final boolean ownAllocator,
- final boolean createIfNotExists,
- final Promise<List<Versioned<byte[]>>> promise) {
- final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size());
- final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size());
- CreateMode createMode = CreateMode.PERSISTENT;
-
- // log root parent path
- if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
- pathsToCreate.add(null);
- } else {
- String logRootParentPath = new File(logRootPath).getParent();
- pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
- zkOps.add(Op.create(logRootParentPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
- }
-
- // log root path
- if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
- pathsToCreate.add(null);
- } else {
- pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
- zkOps.add(Op.create(logRootPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
- }
-
- // max id
- if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) {
- pathsToCreate.add(null);
- } else {
- byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L);
- pathsToCreate.add(zeroTxnIdData);
- zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode));
- }
- // version
- if (pathExists(metadatas.get(MetadataIndex.VERSION))) {
- pathsToCreate.add(null);
- } else {
- byte[] versionData = intToBytes(LAYOUT_VERSION);
- pathsToCreate.add(versionData);
- zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode));
- }
- // lock path
- if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
- pathsToCreate.add(null);
- } else {
- pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
- zkOps.add(Op.create(logRootPath + LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
- }
- // read lock path
- if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
- pathsToCreate.add(null);
- } else {
- pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
- zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
- }
- // log segments path
- if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
- pathsToCreate.add(null);
- } else {
- byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
- DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
- pathsToCreate.add(logSegmentsData);
- zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
- }
- // allocation path
- if (ownAllocator) {
- if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
- pathsToCreate.add(null);
- } else {
- pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
- zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
- DistributedLogConstants.EMPTY_BYTES, acl, createMode));
- }
- }
- if (zkOps.isEmpty()) {
- // nothing missed
- promise.setValue(metadatas);
- return;
- }
- if (!createIfNotExists) {
- promise.setException(new LogNotFoundException("Log " + logRootPath + " not found"));
- return;
- }
-
- zk.multi(zkOps, new AsyncCallback.MultiCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
- if (KeeperException.Code.OK.intValue() == rc) {
- List<Versioned<byte[]>> finalMetadatas =
- Lists.newArrayListWithExpectedSize(metadatas.size());
- for (int i = 0; i < pathsToCreate.size(); i++) {
- byte[] dataCreated = pathsToCreate.get(i);
- if (null == dataCreated) {
- finalMetadatas.add(metadatas.get(i));
- } else {
- finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0)));
- }
- }
- promise.setValue(finalMetadatas);
- } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
- promise.setException(new LogExistsException("Someone just created log "
- + logRootPath));
- } else {
- if (LOG.isDebugEnabled()) {
- StringBuilder builder = new StringBuilder();
- for (OpResult result : resultList) {
- if (result instanceof OpResult.ErrorResult) {
- OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result;
- builder.append(errorResult.getErr()).append(",");
- } else {
- builder.append(0).append(",");
- }
- }
- String resultCodeList = builder.substring(0, builder.length() - 1);
- LOG.debug("Failed to create log, full rc list = {}", resultCodeList);
- }
-
- promise.setException(new ZKException("Failed to create log " + logRootPath,
- KeeperException.Code.get(rc)));
- }
- }
- }, null);
- }
-
- static LogMetadataForWriter processLogMetadatas(URI uri,
- String logName,
- String logIdentifier,
- List<Versioned<byte[]>> metadatas,
- boolean ownAllocator)
- throws UnexpectedException {
- try {
- // max id
- Versioned<byte[]> maxTxnIdData = metadatas.get(MetadataIndex.MAX_TXID);
- ensureMetadataExist(maxTxnIdData);
- // version
- Versioned<byte[]> versionData = metadatas.get(MetadataIndex.VERSION);
- ensureMetadataExist(maxTxnIdData);
- Preconditions.checkArgument(LAYOUT_VERSION == bytesToInt(versionData.getValue()));
- // lock path
- ensureMetadataExist(metadatas.get(MetadataIndex.LOCK));
- // read lock path
- ensureMetadataExist(metadatas.get(MetadataIndex.READ_LOCK));
- // max lssn
- Versioned<byte[]> maxLSSNData = metadatas.get(MetadataIndex.LOGSEGMENTS);
- ensureMetadataExist(maxLSSNData);
- try {
- DLUtils.deserializeLogSegmentSequenceNumber(maxLSSNData.getValue());
- } catch (NumberFormatException nfe) {
- throw new UnexpectedException("Invalid max sequence number found in log " + logName, nfe);
- }
- // allocation path
- Versioned<byte[]> allocationData;
- if (ownAllocator) {
- allocationData = metadatas.get(MetadataIndex.ALLOCATION);
- ensureMetadataExist(allocationData);
- } else {
- allocationData = new Versioned<byte[]>(null, null);
- }
- return new LogMetadataForWriter(uri, logName, logIdentifier,
- maxLSSNData, maxTxnIdData, allocationData);
- } catch (IllegalArgumentException iae) {
- throw new UnexpectedException("Invalid log " + logName, iae);
- } catch (NullPointerException npe) {
- throw new UnexpectedException("Invalid log " + logName, npe);
- }
- }
-
- static Future<LogMetadataForWriter> getLog(final URI uri,
- final String logName,
- final String logIdentifier,
- final ZooKeeperClient zooKeeperClient,
- final boolean ownAllocator,
- final boolean createIfNotExists) {
- final String logRootPath = LogMetadata.getLogRootPath(uri, logName, logIdentifier);
- try {
- PathUtils.validatePath(logRootPath);
- } catch (IllegalArgumentException e) {
- LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e});
- return Future.exception(new InvalidStreamNameException(logName, "Log name is invalid"));
- }
-
- try {
- final ZooKeeper zk = zooKeeperClient.get();
- return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
- .flatMap(new AbstractFunction1<List<Versioned<byte[]>>, Future<List<Versioned<byte[]>>>>() {
- @Override
- public Future<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
- Promise<List<Versioned<byte[]>>> promise =
- new Promise<List<Versioned<byte[]>>>();
- createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(),
- ownAllocator, createIfNotExists, promise);
- return promise;
- }
- }).map(new ExceptionalFunction<List<Versioned<byte[]>>, LogMetadataForWriter>() {
- @Override
- public LogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException {
- return processLogMetadatas(
- uri,
- logName,
- logIdentifier,
- metadatas,
- ownAllocator);
- }
- });
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- return Future.exception(new ZKException("Encountered zookeeper connection issue on creating log " + logName,
- KeeperException.Code.CONNECTIONLOSS));
- } catch (InterruptedException e) {
- return Future.exception(new DLInterruptedException("Interrupted on creating log " + logName, e));
- }
- }
-
- @Override
- public Future<LogMetadataForWriter> getLog(final URI uri,
- final String logName,
- final boolean ownAllocator,
- final boolean createIfNotExists) {
- return getLog(
- uri,
- logName,
- conf.getUnpartitionedStreamName(),
- zooKeeperClient,
- ownAllocator,
- createIfNotExists);
- }
-
- //
- // Delete Log
- //
-
- @Override
- public Future<Void> deleteLog(URI uri, final String logName) {
- final Promise<Void> promise = new Promise<Void>();
- try {
- String streamPath = LogMetadata.getLogStreamPath(uri, logName);
- ZKUtil.deleteRecursive(zooKeeperClient.get(), streamPath, new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx) {
- if (KeeperException.Code.OK.intValue() != rc) {
- FutureUtils.setException(promise,
- new ZKException("Encountered zookeeper issue on deleting log stream "
- + logName, KeeperException.Code.get(rc)));
- return;
- }
- FutureUtils.setValue(promise, null);
- }
- }, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
- + logName, KeeperException.Code.CONNECTIONLOSS));
- } catch (InterruptedException e) {
- FutureUtils.setException(promise, new DLInterruptedException("Interrupted while deleting log stream "
- + logName));
- } catch (KeeperException e) {
- FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
- + logName, e));
- }
- return promise;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java
deleted file mode 100644
index 6b7a231..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.metadata;
-
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.metadata.DLMetadata;
-import com.twitter.distributedlog.metadata.MetadataResolver;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.data.Stat;
-
-import java.io.IOException;
-import java.net.URI;
-
-public class ZkMetadataResolver implements MetadataResolver {
-
- private final ZooKeeperClient zkc;
-
- public ZkMetadataResolver(ZooKeeperClient zkc) {
- this.zkc = zkc;
- }
-
- @Override
- public DLMetadata resolve(URI uri) throws IOException {
- String dlPath = uri.getPath();
- PathUtils.validatePath(dlPath);
- // Normal case the dl metadata is stored in the last segment
- // so lookup last segment first.
- String[] parts = StringUtils.split(dlPath, '/');
- if (null == parts || 0 == parts.length) {
- throw new IOException("Invalid dlPath to resolve dl metadata : " + dlPath);
- }
- for (int i = parts.length; i >= 0; i--) {
- String pathToResolve = String.format("/%s", StringUtils.join(parts, '/', 0, i));
- byte[] data;
- try {
- data = zkc.get().getData(pathToResolve, false, new Stat());
- } catch (KeeperException.NoNodeException nne) {
- continue;
- } catch (KeeperException ke) {
- throw new IOException("Fail to resolve dl path : " + pathToResolve);
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted when resolving dl path : " + pathToResolve);
- }
- if (null == data || data.length == 0) {
- continue;
- }
- try {
- return DLMetadata.deserialize(uri, data);
- } catch (IOException ie) {
- throw new IOException("Failed to deserialize uri : " + uri);
- }
- }
- throw new IOException("No bkdl config bound under dl path : " + dlPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/package-info.java
deleted file mode 100644
index 7c5c2e4..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * The BookKeeper Based DistributedLog Implementation.
- */
-package com.twitter.distributedlog.impl;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
deleted file mode 100644
index b067ee9..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.subscription;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.twitter.distributedlog.subscription.SubscriptionStateStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
-
-import com.google.common.base.Charsets;
-
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-
-public class ZKSubscriptionStateStore implements SubscriptionStateStore {
-
- static final Logger logger = LoggerFactory.getLogger(ZKSubscriptionStateStore.class);
-
- private final ZooKeeperClient zooKeeperClient;
- private final String zkPath;
- private AtomicReference<DLSN> lastCommittedPosition = new AtomicReference<DLSN>(null);
-
- public ZKSubscriptionStateStore(ZooKeeperClient zooKeeperClient, String zkPath) {
- this.zooKeeperClient = zooKeeperClient;
- this.zkPath = zkPath;
- }
-
- @Override
- public void close() throws IOException {
- }
-
- /**
- * Get the last committed position stored for this subscription
- */
- @Override
- public Future<DLSN> getLastCommitPosition() {
- if (null != lastCommittedPosition.get()) {
- return Future.value(lastCommittedPosition.get());
- } else {
- return getLastCommitPositionFromZK();
- }
- }
-
- Future<DLSN> getLastCommitPositionFromZK() {
- final Promise<DLSN> result = new Promise<DLSN>();
- try {
- logger.debug("Reading last commit position from path {}", zkPath);
- zooKeeperClient.get().getData(zkPath, false, new AsyncCallback.DataCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
- logger.debug("Read last commit position from path {}: rc = {}", zkPath, rc);
- if (KeeperException.Code.NONODE.intValue() == rc) {
- result.setValue(DLSN.NonInclusiveLowerBound);
- } else if (KeeperException.Code.OK.intValue() != rc) {
- result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
- } else {
- try {
- DLSN dlsn = DLSN.deserialize(new String(data, Charsets.UTF_8));
- result.setValue(dlsn);
- } catch (Exception t) {
- logger.warn("Invalid last commit position found from path {}", zkPath, t);
- // invalid dlsn recorded in subscription state store
- result.setValue(DLSN.NonInclusiveLowerBound);
- }
- }
- }
- }, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
- result.setException(zkce);
- } catch (InterruptedException ie) {
- result.setException(new DLInterruptedException("getLastCommitPosition was interrupted", ie));
- }
- return result;
- }
-
- /**
- * Advances the position associated with the subscriber
- *
- * @param newPosition - new commit position
- */
- @Override
- public Future<BoxedUnit> advanceCommitPosition(DLSN newPosition) {
- if (null == lastCommittedPosition.get() ||
- (newPosition.compareTo(lastCommittedPosition.get()) > 0)) {
- lastCommittedPosition.set(newPosition);
- return Utils.zkAsyncCreateFullPathOptimisticAndSetData(zooKeeperClient,
- zkPath, newPosition.serialize().getBytes(Charsets.UTF_8),
- zooKeeperClient.getDefaultACL(),
- CreateMode.PERSISTENT);
- } else {
- return Future.Done();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java
deleted file mode 100644
index 17ba943..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.subscription;
-
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.subscription.SubscriptionStateStore;
-import com.twitter.distributedlog.subscription.SubscriptionsStore;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * ZooKeeper Based Subscriptions Store.
- */
-public class ZKSubscriptionsStore implements SubscriptionsStore {
-
- private final ZooKeeperClient zkc;
- private final String zkPath;
- private final ConcurrentMap<String, ZKSubscriptionStateStore> subscribers =
- new ConcurrentHashMap<String, ZKSubscriptionStateStore>();
-
- public ZKSubscriptionsStore(ZooKeeperClient zkc, String zkPath) {
- this.zkc = zkc;
- this.zkPath = zkPath;
- }
-
- private ZKSubscriptionStateStore getSubscriber(String subscriberId) {
- ZKSubscriptionStateStore ss = subscribers.get(subscriberId);
- if (ss == null) {
- ZKSubscriptionStateStore newSS = new ZKSubscriptionStateStore(zkc,
- getSubscriberZKPath(subscriberId));
- ZKSubscriptionStateStore oldSS = subscribers.putIfAbsent(subscriberId, newSS);
- if (oldSS == null) {
- ss = newSS;
- } else {
- try {
- newSS.close();
- } catch (IOException e) {
- // ignore the exception
- }
- ss = oldSS;
- }
- }
- return ss;
- }
-
- private String getSubscriberZKPath(String subscriberId) {
- return String.format("%s/%s", zkPath, subscriberId);
- }
-
- @Override
- public Future<DLSN> getLastCommitPosition(String subscriberId) {
- return getSubscriber(subscriberId).getLastCommitPosition();
- }
-
- @Override
- public Future<Map<String, DLSN>> getLastCommitPositions() {
- final Promise<Map<String, DLSN>> result = new Promise<Map<String, DLSN>>();
- try {
- this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() {
- @Override
- public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
- if (KeeperException.Code.NONODE.intValue() == rc) {
- result.setValue(new HashMap<String, DLSN>());
- } else if (KeeperException.Code.OK.intValue() != rc) {
- result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
- } else {
- getLastCommitPositions(result, children);
- }
- }
- }, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
- result.setException(zkce);
- } catch (InterruptedException ie) {
- result.setException(new DLInterruptedException("getLastCommitPositions was interrupted", ie));
- }
- return result;
- }
-
- private void getLastCommitPositions(final Promise<Map<String, DLSN>> result,
- List<String> subscribers) {
- List<Future<Pair<String, DLSN>>> futures =
- new ArrayList<Future<Pair<String, DLSN>>>(subscribers.size());
- for (String s : subscribers) {
- final String subscriber = s;
- Future<Pair<String, DLSN>> future =
- // Get the last commit position from zookeeper
- getSubscriber(subscriber).getLastCommitPositionFromZK().map(
- new AbstractFunction1<DLSN, Pair<String, DLSN>>() {
- @Override
- public Pair<String, DLSN> apply(DLSN dlsn) {
- return Pair.of(subscriber, dlsn);
- }
- });
- futures.add(future);
- }
- Future.collect(futures).foreach(
- new AbstractFunction1<List<Pair<String, DLSN>>, BoxedUnit>() {
- @Override
- public BoxedUnit apply(List<Pair<String, DLSN>> subscriptions) {
- Map<String, DLSN> subscriptionMap = new HashMap<String, DLSN>();
- for (Pair<String, DLSN> pair : subscriptions) {
- subscriptionMap.put(pair.getLeft(), pair.getRight());
- }
- result.setValue(subscriptionMap);
- return BoxedUnit.UNIT;
- }
- });
- }
-
- @Override
- public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN newPosition) {
- return getSubscriber(subscriberId).advanceCommitPosition(newPosition);
- }
-
- @Override
- public Future<Boolean> deleteSubscriber(String subscriberId) {
- subscribers.remove(subscriberId);
- String path = getSubscriberZKPath(subscriberId);
- return Utils.zkDeleteIfNotExist(zkc, path, new ZkVersion(-1));
- }
-
- @Override
- public void close() throws IOException {
- // no-op
- for (SubscriptionStateStore store : subscribers.values()) {
- store.close();
- }
- }
-
-}