You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:27 UTC

[22/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
deleted file mode 100644
index 91e6dec..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.logsegment;
-
-import com.twitter.distributedlog.BookKeeperClient;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.bk.DynamicQuorumConfigProvider;
-import com.twitter.distributedlog.bk.LedgerAllocator;
-import com.twitter.distributedlog.bk.LedgerAllocatorDelegator;
-import com.twitter.distributedlog.bk.QuorumConfigProvider;
-import com.twitter.distributedlog.bk.SimpleLedgerAllocator;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.BKTransmitException;
-import com.twitter.distributedlog.injector.AsyncFailureInjector;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryReader;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
-import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
-import com.twitter.distributedlog.metadata.LogMetadataForWriter;
-import com.twitter.distributedlog.util.Allocator;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.AsyncCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * BookKeeper Based Entry Store
- */
-public class BKLogSegmentEntryStore implements
-        LogSegmentEntryStore,
-        AsyncCallback.OpenCallback,
-        AsyncCallback.DeleteCallback {
-
-    private static final Logger logger = LoggerFactory.getLogger(BKLogSegmentEntryReader.class);
-
-    private static class OpenReaderRequest {
-
-        private final LogSegmentMetadata segment;
-        private final long startEntryId;
-        private final Promise<LogSegmentEntryReader> openPromise;
-
-        OpenReaderRequest(LogSegmentMetadata segment,
-                          long startEntryId) {
-            this.segment = segment;
-            this.startEntryId = startEntryId;
-            this.openPromise = new Promise<LogSegmentEntryReader>();
-        }
-
-    }
-
-    private static class DeleteLogSegmentRequest {
-
-        private final LogSegmentMetadata segment;
-        private final Promise<LogSegmentMetadata> deletePromise;
-
-        DeleteLogSegmentRequest(LogSegmentMetadata segment) {
-            this.segment = segment;
-            this.deletePromise = new Promise<LogSegmentMetadata>();
-        }
-
-    }
-
-    private final byte[] passwd;
-    private final ZooKeeperClient zkc;
-    private final BookKeeperClient bkc;
-    private final OrderedScheduler scheduler;
-    private final DistributedLogConfiguration conf;
-    private final DynamicDistributedLogConfiguration dynConf;
-    private final StatsLogger statsLogger;
-    private final AsyncFailureInjector failureInjector;
-    // ledger allocator
-    private final LedgerAllocator allocator;
-
-    public BKLogSegmentEntryStore(DistributedLogConfiguration conf,
-                                  DynamicDistributedLogConfiguration dynConf,
-                                  ZooKeeperClient zkc,
-                                  BookKeeperClient bkc,
-                                  OrderedScheduler scheduler,
-                                  LedgerAllocator allocator,
-                                  StatsLogger statsLogger,
-                                  AsyncFailureInjector failureInjector) {
-        this.conf = conf;
-        this.dynConf = dynConf;
-        this.zkc = zkc;
-        this.bkc = bkc;
-        this.passwd = conf.getBKDigestPW().getBytes(UTF_8);
-        this.scheduler = scheduler;
-        this.allocator = allocator;
-        this.statsLogger = statsLogger;
-        this.failureInjector = failureInjector;
-    }
-
-    @Override
-    public Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment) {
-        DeleteLogSegmentRequest request = new DeleteLogSegmentRequest(segment);
-        BookKeeper bk;
-        try {
-            bk = this.bkc.get();
-        } catch (IOException e) {
-            return Future.exception(e);
-        }
-        bk.asyncDeleteLedger(segment.getLogSegmentId(), this, request);
-        return request.deletePromise;
-    }
-
-    @Override
-    public void deleteComplete(int rc, Object ctx) {
-        DeleteLogSegmentRequest deleteRequest = (DeleteLogSegmentRequest) ctx;
-        if (BKException.Code.NoSuchLedgerExistsException == rc) {
-            logger.warn("No ledger {} found to delete for {}.",
-                    deleteRequest.segment.getLogSegmentId(), deleteRequest.segment);
-        } else if (BKException.Code.OK != rc) {
-            logger.error("Couldn't delete ledger {} from bookkeeper for {} : {}",
-                    new Object[]{ deleteRequest.segment.getLogSegmentId(), deleteRequest.segment,
-                            BKException.getMessage(rc) });
-            FutureUtils.setException(deleteRequest.deletePromise,
-                    new BKTransmitException("Couldn't delete log segment " + deleteRequest.segment, rc));
-            return;
-        }
-        FutureUtils.setValue(deleteRequest.deletePromise, deleteRequest.segment);
-    }
-
-    //
-    // Writers
-    //
-
-    LedgerAllocator createLedgerAllocator(LogMetadataForWriter logMetadata,
-                                          DynamicDistributedLogConfiguration dynConf)
-            throws IOException {
-        LedgerAllocator ledgerAllocatorDelegator;
-        if (null == allocator || !dynConf.getEnableLedgerAllocatorPool()) {
-            QuorumConfigProvider quorumConfigProvider =
-                    new DynamicQuorumConfigProvider(dynConf);
-            LedgerAllocator allocator = new SimpleLedgerAllocator(
-                    logMetadata.getAllocationPath(),
-                    logMetadata.getAllocationData(),
-                    quorumConfigProvider,
-                    zkc,
-                    bkc);
-            ledgerAllocatorDelegator = new LedgerAllocatorDelegator(allocator, true);
-        } else {
-            ledgerAllocatorDelegator = allocator;
-        }
-        return ledgerAllocatorDelegator;
-    }
-
-    @Override
-    public Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator(
-            LogMetadataForWriter logMetadata,
-            DynamicDistributedLogConfiguration dynConf) throws IOException {
-        // Build the ledger allocator
-        LedgerAllocator allocator = createLedgerAllocator(logMetadata, dynConf);
-        return new BKLogSegmentAllocator(allocator);
-    }
-
-    //
-    // Readers
-    //
-
-    @Override
-    public Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
-                                                    long startEntryId) {
-        BookKeeper bk;
-        try {
-            bk = this.bkc.get();
-        } catch (IOException e) {
-            return Future.exception(e);
-        }
-        OpenReaderRequest request = new OpenReaderRequest(segment, startEntryId);
-        if (segment.isInProgress()) {
-            bk.asyncOpenLedgerNoRecovery(
-                    segment.getLogSegmentId(),
-                    BookKeeper.DigestType.CRC32,
-                    passwd,
-                    this,
-                    request);
-        } else {
-            bk.asyncOpenLedger(
-                    segment.getLogSegmentId(),
-                    BookKeeper.DigestType.CRC32,
-                    passwd,
-                    this,
-                    request);
-        }
-        return request.openPromise;
-    }
-
-    @Override
-    public void openComplete(int rc, LedgerHandle lh, Object ctx) {
-        OpenReaderRequest request = (OpenReaderRequest) ctx;
-        if (BKException.Code.OK != rc) {
-            FutureUtils.setException(
-                    request.openPromise,
-                    new BKTransmitException("Failed to open ledger handle for log segment " + request.segment, rc));
-            return;
-        }
-        // successfully open a ledger
-        try {
-            LogSegmentEntryReader reader = new BKLogSegmentEntryReader(
-                    request.segment,
-                    lh,
-                    request.startEntryId,
-                    bkc.get(),
-                    scheduler,
-                    conf,
-                    statsLogger,
-                    failureInjector);
-            FutureUtils.setValue(request.openPromise, reader);
-        } catch (IOException e) {
-            FutureUtils.setException(request.openPromise, e);
-        }
-
-    }
-
-    @Override
-    public Future<LogSegmentRandomAccessEntryReader> openRandomAccessReader(final LogSegmentMetadata segment,
-                                                                            final boolean fence) {
-        final BookKeeper bk;
-        try {
-            bk = this.bkc.get();
-        } catch (IOException e) {
-            return Future.exception(e);
-        }
-        final Promise<LogSegmentRandomAccessEntryReader> openPromise = new Promise<LogSegmentRandomAccessEntryReader>();
-        AsyncCallback.OpenCallback openCallback = new AsyncCallback.OpenCallback() {
-            @Override
-            public void openComplete(int rc, LedgerHandle lh, Object ctx) {
-                if (BKException.Code.OK != rc) {
-                    FutureUtils.setException(
-                            openPromise,
-                            new BKTransmitException("Failed to open ledger handle for log segment " + segment, rc));
-                    return;
-                }
-                LogSegmentRandomAccessEntryReader reader = new BKLogSegmentRandomAccessEntryReader(
-                        segment,
-                        lh,
-                        conf);
-                FutureUtils.setValue(openPromise, reader);
-            }
-        };
-        if (segment.isInProgress() && !fence) {
-            bk.asyncOpenLedgerNoRecovery(
-                    segment.getLogSegmentId(),
-                    BookKeeper.DigestType.CRC32,
-                    passwd,
-                    openCallback,
-                    null);
-        } else {
-            bk.asyncOpenLedger(
-                    segment.getLogSegmentId(),
-                    BookKeeper.DigestType.CRC32,
-                    passwd,
-                    openCallback,
-                    null);
-        }
-        return openPromise;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
deleted file mode 100644
index 34fe1c3..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.logsegment;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
-import org.apache.bookkeeper.client.AsyncCallback;
-import org.apache.bookkeeper.client.LedgerHandle;
-
-/**
- * Ledger based log segment entry writer.
- */
-public class BKLogSegmentEntryWriter implements LogSegmentEntryWriter {
-
-    private final LedgerHandle lh;
-
-    public BKLogSegmentEntryWriter(LedgerHandle lh) {
-        this.lh = lh;
-    }
-
-    @VisibleForTesting
-    public LedgerHandle getLedgerHandle() {
-        return this.lh;
-    }
-
-    @Override
-    public long getLogSegmentId() {
-        return lh.getId();
-    }
-
-    @Override
-    public void asyncClose(AsyncCallback.CloseCallback callback, Object ctx) {
-        lh.asyncClose(callback, ctx);
-    }
-
-    @Override
-    public void asyncAddEntry(byte[] data, int offset, int length,
-                              AsyncCallback.AddCallback callback, Object ctx) {
-        lh.asyncAddEntry(data, offset, length, callback, ctx);
-    }
-
-    @Override
-    public long size() {
-        return lh.getLength();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
deleted file mode 100644
index 9cec80c..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.logsegment;
-
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.Entry;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.exceptions.BKTransmitException;
-import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-
-import java.io.IOException;
-import java.util.Enumeration;
-import java.util.List;
-
-/**
- * BookKeeper ledger based random access entry reader.
- */
-class BKLogSegmentRandomAccessEntryReader implements
-        LogSegmentRandomAccessEntryReader,
-        ReadCallback {
-
-    private final long lssn;
-    private final long startSequenceId;
-    private final boolean envelopeEntries;
-    private final boolean deserializeRecordSet;
-    // state
-    private final LogSegmentMetadata metadata;
-    private final LedgerHandle lh;
-    private Promise<Void> closePromise = null;
-
-    BKLogSegmentRandomAccessEntryReader(LogSegmentMetadata metadata,
-                                        LedgerHandle lh,
-                                        DistributedLogConfiguration conf) {
-        this.metadata = metadata;
-        this.lssn = metadata.getLogSegmentSequenceNumber();
-        this.startSequenceId = metadata.getStartSequenceId();
-        this.envelopeEntries = metadata.getEnvelopeEntries();
-        this.deserializeRecordSet = conf.getDeserializeRecordSetOnReads();
-        this.lh = lh;
-    }
-
-    @Override
-    public long getLastAddConfirmed() {
-        return lh.getLastAddConfirmed();
-    }
-
-    @Override
-    public Future<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId) {
-        Promise<List<Entry.Reader>> promise = new Promise<List<Entry.Reader>>();
-        lh.asyncReadEntries(startEntryId, endEntryId, this, promise);
-        return promise;
-    }
-
-    Entry.Reader processReadEntry(LedgerEntry entry) throws IOException {
-        return Entry.newBuilder()
-                .setLogSegmentInfo(lssn, startSequenceId)
-                .setEntryId(entry.getEntryId())
-                .setEnvelopeEntry(envelopeEntries)
-                .deserializeRecordSet(deserializeRecordSet)
-                .setInputStream(entry.getEntryInputStream())
-                .buildReader();
-    }
-
-    @Override
-    public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) {
-        Promise<List<Entry.Reader>> promise = (Promise<List<Entry.Reader>>) ctx;
-        if (BKException.Code.OK == rc) {
-            List<Entry.Reader> entryList = Lists.newArrayList();
-            while (entries.hasMoreElements()) {
-                try {
-                    entryList.add(processReadEntry(entries.nextElement()));
-                } catch (IOException ioe) {
-                    FutureUtils.setException(promise, ioe);
-                    return;
-                }
-            }
-            FutureUtils.setValue(promise, entryList);
-        } else {
-            FutureUtils.setException(promise,
-                    new BKTransmitException("Failed to read entries :", rc));
-        }
-    }
-
-    @Override
-    public Future<Void> asyncClose() {
-        final Promise<Void> closeFuture;
-        synchronized (this) {
-            if (null != closePromise) {
-                return closePromise;
-            }
-            closeFuture = closePromise = new Promise<Void>();
-        }
-        BKUtils.closeLedgers(lh).proxyTo(closeFuture);
-        return closeFuture;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java
deleted file mode 100644
index c71c67e..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.logsegment;
-
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.function.VoidFunctions;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.util.Future;
-import com.twitter.util.Futures;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.AsyncCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerHandle;
-
-import java.util.List;
-
-/**
- * BookKeeper Util Functions
- */
-public class BKUtils {
-
-    /**
-     * Close a ledger <i>lh</i>.
-     *
-     * @param lh ledger handle
-     * @return future represents close result.
-     */
-    public static Future<Void> closeLedger(LedgerHandle lh) {
-        final Promise<Void> closePromise = new Promise<Void>();
-        lh.asyncClose(new AsyncCallback.CloseCallback() {
-            @Override
-            public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
-                if (BKException.Code.OK != rc) {
-                    FutureUtils.setException(closePromise, BKException.create(rc));
-                } else {
-                    FutureUtils.setValue(closePromise, null);
-                }
-            }
-        }, null);
-        return closePromise;
-    }
-
-    /**
-     * Close a list of ledgers <i>lhs</i>.
-     *
-     * @param lhs a list of ledgers
-     * @return future represents close results.
-     */
-    public static Future<Void> closeLedgers(LedgerHandle ... lhs) {
-        List<Future<Void>> closeResults = Lists.newArrayListWithExpectedSize(lhs.length);
-        for (LedgerHandle lh : lhs) {
-            closeResults.add(closeLedger(lh));
-        }
-        return Futures.collect(closeResults).map(VoidFunctions.LIST_TO_VOID_FUNC);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java
deleted file mode 100644
index 3e859fb..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java
+++ /dev/null
@@ -1,400 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.metadata;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.impl.BKNamespaceDriver;
-import com.twitter.distributedlog.metadata.DLConfig;
-import com.twitter.distributedlog.thrift.BKDLConfigFormat;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.apache.thrift.transport.TMemoryBuffer;
-import org.apache.thrift.transport.TMemoryInputTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * Configurations for BookKeeper based DL.
- */
-public class BKDLConfig implements DLConfig {
-
-    private static final Logger LOG = LoggerFactory.getLogger(BKDLConfig.class);
-
-    private static final int BUFFER_SIZE = 4096;
-    private static final ConcurrentMap<URI, DLConfig> cachedDLConfigs =
-            new ConcurrentHashMap<URI, DLConfig>();
-
-    public static void propagateConfiguration(BKDLConfig bkdlConfig, DistributedLogConfiguration dlConf) {
-        dlConf.setEncodeRegionIDInLogSegmentMetadata(bkdlConfig.getEncodeRegionID());
-        dlConf.setFirstLogSegmentSequenceNumber(bkdlConfig.getFirstLogSegmentSeqNo());
-        if (bkdlConfig.isFederatedNamespace()) {
-            dlConf.setCreateStreamIfNotExists(false);
-            LOG.info("Disabled createIfNotExists for federated namespace.");
-        }
-        LOG.info("Propagate BKDLConfig to DLConfig : encodeRegionID = {}," +
-                        " firstLogSegmentSequenceNumber = {}, createStreamIfNotExists = {}, isFederated = {}.",
-                new Object[] { dlConf.getEncodeRegionIDInLogSegmentMetadata(),
-                        dlConf.getFirstLogSegmentSequenceNumber(), dlConf.getCreateStreamIfNotExists(),
-                        bkdlConfig.isFederatedNamespace() });
-    }
-
-    public static BKDLConfig resolveDLConfig(ZooKeeperClient zkc, URI uri) throws IOException {
-        DLConfig dlConfig = cachedDLConfigs.get(uri);
-        if (dlConfig == null) {
-            dlConfig = (new ZkMetadataResolver(zkc).resolve(uri)).getDLConfig();
-            DLConfig oldDLConfig = cachedDLConfigs.putIfAbsent(uri, dlConfig);
-            if (null != oldDLConfig) {
-                dlConfig = oldDLConfig;
-            }
-        }
-        assert (dlConfig instanceof BKDLConfig);
-        return (BKDLConfig)dlConfig;
-    }
-
-    @VisibleForTesting
-    public static void clearCachedDLConfigs() {
-        cachedDLConfigs.clear();
-    }
-
-    private String bkZkServersForWriter;
-    private String bkZkServersForReader;
-    private String bkLedgersPath;
-    private boolean sanityCheckTxnID = true;
-    private boolean encodeRegionID = false;
-    private String dlZkServersForWriter;
-    private String dlZkServersForReader;
-    private String aclRootPath;
-    private Long firstLogSegmentSeqNo;
-    private boolean isFederatedNamespace = false;
-
-    /**
-     * Construct a empty config with given <i>uri</i>.
-     */
-    public BKDLConfig(URI uri) {
-        this(BKNamespaceDriver.getZKServersFromDLUri(uri),
-             BKNamespaceDriver.getZKServersFromDLUri(uri),
-             null, null, null);
-    }
-
-    /**
-     * The caller should make sure both dl and bk use same zookeeper server.
-     *
-     * @param zkServers
-     *          zk servers used for both dl and bk.
-     * @param ledgersPath
-     *          ledgers path.
-     */
-    @VisibleForTesting
-    public BKDLConfig(String zkServers, String ledgersPath) {
-        this(zkServers, zkServers, zkServers, zkServers, ledgersPath);
-    }
-
-    public BKDLConfig(String dlZkServersForWriter,
-                      String dlZkServersForReader,
-                      String bkZkServersForWriter,
-                      String bkZkServersForReader,
-                      String bkLedgersPath) {
-        this.dlZkServersForWriter = dlZkServersForWriter;
-        this.dlZkServersForReader = dlZkServersForReader;
-        this.bkZkServersForWriter = bkZkServersForWriter;
-        this.bkZkServersForReader = bkZkServersForReader;
-        this.bkLedgersPath = bkLedgersPath;
-    }
-
-    /**
-     * @return zk servers used for bk for writers
-     */
-    public String getBkZkServersForWriter() {
-        return bkZkServersForWriter;
-    }
-
-    /**
-     * @return zk servers used for bk for readers
-     */
-    public String getBkZkServersForReader() {
-        return bkZkServersForReader;
-    }
-
-    /**
-     * @return zk servers used for dl for writers
-     */
-    public String getDlZkServersForWriter() {
-        return dlZkServersForWriter;
-    }
-
-    /**
-     * @return zk servers used for dl for readers
-     */
-    public String getDlZkServersForReader() {
-        return dlZkServersForReader;
-    }
-
-    /**
-     * @return ledgers path for bk
-     */
-    public String getBkLedgersPath() {
-        return bkLedgersPath;
-    }
-
-    /**
-     * Enable/Disable sanity check txn id.
-     *
-     * @param enabled
-     *          flag to enable/disable sanity check txn id.
-     * @return bk dl config.
-     */
-    public BKDLConfig setSanityCheckTxnID(boolean enabled) {
-        this.sanityCheckTxnID = enabled;
-        return this;
-    }
-
-    /**
-     * @return flag to sanity check highest txn id.
-     */
-    public boolean getSanityCheckTxnID() {
-        return sanityCheckTxnID;
-    }
-
-    /**
-     * Enable/Disable encode region id.
-     *
-     * @param enabled
-     *          flag to enable/disable encoding region id.
-     * @return bk dl config
-     */
-    public BKDLConfig setEncodeRegionID(boolean enabled) {
-        this.encodeRegionID = enabled;
-        return this;
-    }
-
-    /**
-     * @return flag to encode region id.
-     */
-    public boolean getEncodeRegionID() {
-        return encodeRegionID;
-    }
-
-    /**
-     * Set the root path of zk based ACL manager.
-     *
-     * @param aclRootPath
-     *          root path of zk based ACL manager.
-     * @return bk dl config
-     */
-    public BKDLConfig setACLRootPath(String aclRootPath) {
-        this.aclRootPath = aclRootPath;
-        return this;
-    }
-
-    /**
-     * Get the root path of zk based ACL manager.
-     *
-     * @return root path of zk based ACL manager.
-     */
-    public String getACLRootPath() {
-        return aclRootPath;
-    }
-
-    /**
-     * Set the value at which ledger sequence number should start for streams that are being
-     * upgraded and did not have ledger sequence number to start with or for newly created
-     * streams
-     *
-     * @param firstLogSegmentSeqNo first ledger sequence number
-     * @return bk dl config
-     */
-    public BKDLConfig setFirstLogSegmentSeqNo(long firstLogSegmentSeqNo) {
-        this.firstLogSegmentSeqNo = firstLogSegmentSeqNo;
-        return this;
-    }
-
-    /**
-     * Get the value at which ledger sequence number should start for streams that are being
-     * upgraded and did not have ledger sequence number to start with or for newly created
-     * streams
-     *
-     * @return first ledger sequence number
-     */
-    public Long getFirstLogSegmentSeqNo() {
-        if (null == firstLogSegmentSeqNo) {
-            return DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO;
-        }
-        return firstLogSegmentSeqNo;
-    }
-
-    /**
-     * Set the namespace to federated <i>isFederatedNamespace</i>.
-     *
-     * @param isFederatedNamespace
-     *          is the namespace federated?
-     * @return bk dl config
-     */
-    public BKDLConfig setFederatedNamespace(boolean isFederatedNamespace) {
-        this.isFederatedNamespace = isFederatedNamespace;
-        return this;
-    }
-
-    /**
-     * Whether the namespace is federated namespace
-     *
-     * @return true if the namespace is a federated namespace. otherwise false.
-     */
-    public boolean isFederatedNamespace() {
-        return this.isFederatedNamespace;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(bkZkServersForWriter, bkZkServersForReader,
-                                dlZkServersForWriter, dlZkServersForReader,
-                                bkLedgersPath);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof BKDLConfig)) {
-            return false;
-        }
-        BKDLConfig another = (BKDLConfig) o;
-        return Objects.equal(bkZkServersForWriter, another.bkZkServersForWriter) &&
-               Objects.equal(bkZkServersForReader, another.bkZkServersForReader) &&
-               Objects.equal(dlZkServersForWriter, another.dlZkServersForWriter) &&
-               Objects.equal(dlZkServersForReader, another.dlZkServersForReader) &&
-               Objects.equal(bkLedgersPath, another.bkLedgersPath) &&
-               sanityCheckTxnID == another.sanityCheckTxnID &&
-               encodeRegionID == another.encodeRegionID &&
-               Objects.equal(aclRootPath, another.aclRootPath) &&
-               Objects.equal(firstLogSegmentSeqNo, another.firstLogSegmentSeqNo) &&
-               Objects.equal(isFederatedNamespace, another.isFederatedNamespace);
-
-    }
-
-    @Override
-    public String toString() {
-        return serialize();
-    }
-
-    @Override
-    public String serialize() {
-        BKDLConfigFormat configFormat = new BKDLConfigFormat();
-        if (null != bkZkServersForWriter) {
-            configFormat.setBkZkServers(bkZkServersForWriter);
-        }
-        if (null != bkZkServersForReader) {
-            configFormat.setBkZkServersForReader(bkZkServersForReader);
-        }
-        if (null != dlZkServersForWriter) {
-            configFormat.setDlZkServersForWriter(dlZkServersForWriter);
-        }
-        if (null != dlZkServersForReader) {
-            configFormat.setDlZkServersForReader(dlZkServersForReader);
-        }
-        if (null != bkLedgersPath) {
-            configFormat.setBkLedgersPath(bkLedgersPath);
-        }
-        configFormat.setSanityCheckTxnID(sanityCheckTxnID);
-        configFormat.setEncodeRegionID(encodeRegionID);
-        if (null != aclRootPath) {
-            configFormat.setAclRootPath(aclRootPath);
-        }
-        if (null != firstLogSegmentSeqNo) {
-            configFormat.setFirstLogSegmentSeqNo(firstLogSegmentSeqNo);
-        }
-        if (isFederatedNamespace) {
-            configFormat.setFederatedNamespace(true);
-        }
-        return serialize(configFormat);
-    }
-
-    String serialize(BKDLConfigFormat configFormat) {
-        TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
-        TJSONProtocol protocol = new TJSONProtocol(transport);
-        try {
-            configFormat.write(protocol);
-            transport.flush();
-            return transport.toString("UTF-8");
-        } catch (TException e) {
-            throw new RuntimeException("Failed to serialize BKDLConfig : ", e);
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException("Failed to serialize BKDLConfig : ", e);
-        }
-    }
-
-    @Override
-    public void deserialize(byte[] data) throws IOException {
-        BKDLConfigFormat configFormat = new BKDLConfigFormat();
-        TMemoryInputTransport transport = new TMemoryInputTransport(data);
-        TJSONProtocol protocol = new TJSONProtocol(transport);
-        try {
-            configFormat.read(protocol);
-        } catch (TException e) {
-            throw new IOException("Failed to deserialize data '" +
-                    new String(data, UTF_8) + "' : ", e);
-        }
-        // bookkeeper cluster settings
-        if (configFormat.isSetBkZkServers()) {
-            bkZkServersForWriter = configFormat.getBkZkServers();
-        }
-        if (configFormat.isSetBkZkServersForReader()) {
-            bkZkServersForReader = configFormat.getBkZkServersForReader();
-        } else {
-            bkZkServersForReader = bkZkServersForWriter;
-        }
-        if (configFormat.isSetBkLedgersPath()) {
-            bkLedgersPath = configFormat.getBkLedgersPath();
-        }
-        // dl zookeeper cluster settings
-        if (configFormat.isSetDlZkServersForWriter()) {
-            dlZkServersForWriter = configFormat.getDlZkServersForWriter();
-        }
-        if (configFormat.isSetDlZkServersForReader()) {
-            dlZkServersForReader = configFormat.getDlZkServersForReader();
-        } else {
-            dlZkServersForReader = dlZkServersForWriter;
-        }
-        // dl settings
-        sanityCheckTxnID = !configFormat.isSetSanityCheckTxnID() || configFormat.isSanityCheckTxnID();
-        encodeRegionID = configFormat.isSetEncodeRegionID() && configFormat.isEncodeRegionID();
-        if (configFormat.isSetAclRootPath()) {
-            aclRootPath = configFormat.getAclRootPath();
-        }
-
-        if (configFormat.isSetFirstLogSegmentSeqNo()) {
-            firstLogSegmentSeqNo = configFormat.getFirstLogSegmentSeqNo();
-        }
-        isFederatedNamespace = configFormat.isSetFederatedNamespace() && configFormat.isFederatedNamespace();
-
-        // Validate the settings
-        if (null == bkZkServersForWriter || null == bkZkServersForReader || null == bkLedgersPath ||
-                null == dlZkServersForWriter || null == dlZkServersForReader) {
-            throw new IOException("Missing zk/bk settings in BKDL Config : " + new String(data, UTF_8));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
deleted file mode 100644
index c76a5a5..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ /dev/null
@@ -1,633 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.metadata;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
-import com.twitter.distributedlog.exceptions.LockCancelledException;
-import com.twitter.distributedlog.exceptions.LogExistsException;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
-import com.twitter.distributedlog.lock.DistributedLock;
-import com.twitter.distributedlog.lock.SessionLockFactory;
-import com.twitter.distributedlog.lock.ZKDistributedLock;
-import com.twitter.distributedlog.lock.ZKSessionLockFactory;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
-import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
-import com.twitter.distributedlog.metadata.LogMetadata;
-import com.twitter.distributedlog.metadata.LogMetadataForReader;
-import com.twitter.distributedlog.metadata.LogMetadataForWriter;
-import com.twitter.distributedlog.util.DLUtils;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.SchedulerUtils;
-import com.twitter.distributedlog.zk.LimitedPermitManager;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.PermitManager;
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.distributedlog.zk.ZKTransaction;
-import com.twitter.util.ExceptionalFunction;
-import com.twitter.util.ExceptionalFunction0;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static com.twitter.distributedlog.metadata.LogMetadata.*;
-
-/**
- * zookeeper based {@link LogStreamMetadataStore}
- */
-public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
-
-    private final static Logger LOG = LoggerFactory.getLogger(ZKLogStreamMetadataStore.class);
-
-    private final String clientId;
-    private final DistributedLogConfiguration conf;
-    private final ZooKeeperClient zooKeeperClient;
-    private final OrderedScheduler scheduler;
-    private final StatsLogger statsLogger;
-    private final LogSegmentMetadataStore logSegmentStore;
-    private final LimitedPermitManager permitManager;
-    // lock
-    private SessionLockFactory lockFactory;
-    private OrderedScheduler lockStateExecutor;
-
-    public ZKLogStreamMetadataStore(String clientId,
-                                    DistributedLogConfiguration conf,
-                                    ZooKeeperClient zkc,
-                                    OrderedScheduler scheduler,
-                                    StatsLogger statsLogger) {
-        this.clientId = clientId;
-        this.conf = conf;
-        this.zooKeeperClient = zkc;
-        this.scheduler = scheduler;
-        this.statsLogger = statsLogger;
-        // create the log segment metadata store and the permit manager (used for log segment rolling)
-        this.logSegmentStore = new ZKLogSegmentMetadataStore(conf, zooKeeperClient, scheduler);
-        this.permitManager = new LimitedPermitManager(
-                conf.getLogSegmentRollingConcurrency(),
-                1,
-                TimeUnit.MINUTES,
-                scheduler);
-        this.zooKeeperClient.register(permitManager);
-    }
-
-    private synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
-        if (createIfNull && null == lockStateExecutor) {
-            StatsLogger lockStateStatsLogger = statsLogger.scope("lock_scheduler");
-            lockStateExecutor = OrderedScheduler.newBuilder()
-                    .name("DLM-LockState")
-                    .corePoolSize(conf.getNumLockStateThreads())
-                    .statsLogger(lockStateStatsLogger)
-                    .perExecutorStatsLogger(lockStateStatsLogger)
-                    .traceTaskExecution(conf.getEnableTaskExecutionStats())
-                    .traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros())
-                    .build();
-        }
-        return lockStateExecutor;
-    }
-
-    private synchronized SessionLockFactory getLockFactory(boolean createIfNull) {
-        if (createIfNull && null == lockFactory) {
-            lockFactory = new ZKSessionLockFactory(
-                    zooKeeperClient,
-                    clientId,
-                    getLockStateExecutor(createIfNull),
-                    conf.getZKNumRetries(),
-                    conf.getLockTimeoutMilliSeconds(),
-                    conf.getZKRetryBackoffStartMillis(),
-                    statsLogger);
-        }
-        return lockFactory;
-    }
-
-    @Override
-    public void close() throws IOException {
-        this.zooKeeperClient.unregister(permitManager);
-        this.permitManager.close();
-        this.logSegmentStore.close();
-        SchedulerUtils.shutdownScheduler(
-                getLockStateExecutor(false),
-                conf.getSchedulerShutdownTimeoutMs(),
-                TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public LogSegmentMetadataStore getLogSegmentMetadataStore() {
-        return logSegmentStore;
-    }
-
-    @Override
-    public PermitManager getPermitManager() {
-        return this.permitManager;
-    }
-
-    @Override
-    public Transaction<Object> newTransaction() {
-        return new ZKTransaction(zooKeeperClient);
-    }
-
-    @Override
-    public Future<Void> logExists(URI uri, final String logName) {
-        final String logSegmentsPath = LogMetadata.getLogSegmentsPath(
-                uri, logName, conf.getUnpartitionedStreamName());
-        final Promise<Void> promise = new Promise<Void>();
-        try {
-            final ZooKeeper zk = zooKeeperClient.get();
-            zk.sync(logSegmentsPath, new AsyncCallback.VoidCallback() {
-                @Override
-                public void processResult(int syncRc, String path, Object syncCtx) {
-                    if (KeeperException.Code.NONODE.intValue() == syncRc) {
-                        promise.setException(new LogNotFoundException(
-                                String.format("Log %s does not exist or has been deleted", logName)));
-                        return;
-                    } else if (KeeperException.Code.OK.intValue() != syncRc){
-                        promise.setException(new ZKException("Error on checking log existence for " + logName,
-                                KeeperException.create(KeeperException.Code.get(syncRc))));
-                        return;
-                    }
-                    zk.exists(logSegmentsPath, false, new AsyncCallback.StatCallback() {
-                        @Override
-                        public void processResult(int rc, String path, Object ctx, Stat stat) {
-                            if (KeeperException.Code.OK.intValue() == rc) {
-                                promise.setValue(null);
-                            } else if (KeeperException.Code.NONODE.intValue() == rc) {
-                                promise.setException(new LogNotFoundException(
-                                        String.format("Log %s does not exist or has been deleted", logName)));
-                            } else {
-                                promise.setException(new ZKException("Error on checking log existence for " + logName,
-                                        KeeperException.create(KeeperException.Code.get(rc))));
-                            }
-                        }
-                    }, null);
-                }
-            }, null);
-
-        } catch (InterruptedException ie) {
-            LOG.error("Interrupted while reading {}", logSegmentsPath, ie);
-            promise.setException(new DLInterruptedException("Interrupted while checking "
-                    + logSegmentsPath, ie));
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        }
-        return promise;
-    }
-
-    //
-    // Create Write Lock
-    //
-
-    @Override
-    public DistributedLock createWriteLock(LogMetadataForWriter metadata) {
-        return new ZKDistributedLock(
-                getLockStateExecutor(true),
-                getLockFactory(true),
-                metadata.getLockPath(),
-                conf.getLockTimeoutMilliSeconds(),
-                statsLogger);
-    }
-
-    //
-    // Create Read Lock
-    //
-
-    private Future<Void> ensureReadLockPathExist(final LogMetadata logMetadata,
-                                                 final String readLockPath) {
-        final Promise<Void> promise = new Promise<Void>();
-        promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable t) {
-                FutureUtils.setException(promise, new LockCancelledException(readLockPath,
-                        "Could not ensure read lock path", t));
-                return null;
-            }
-        });
-        Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath());
-        Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate,
-                new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT,
-                new org.apache.zookeeper.AsyncCallback.StringCallback() {
-                    @Override
-                    public void processResult(final int rc, final String path, Object ctx, String name) {
-                        if (KeeperException.Code.NONODE.intValue() == rc) {
-                            FutureUtils.setException(promise, new LogNotFoundException(
-                                    String.format("Log %s does not exist or has been deleted",
-                                            logMetadata.getFullyQualifiedName())));
-                        } else if (KeeperException.Code.OK.intValue() == rc) {
-                            FutureUtils.setValue(promise, null);
-                            LOG.trace("Created path {}.", path);
-                        } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                            FutureUtils.setValue(promise, null);
-                            LOG.trace("Path {} is already existed.", path);
-                        } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
-                            FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path));
-                        } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
-                            FutureUtils.setException(promise, new DLInterruptedException(path));
-                        } else {
-                            FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc)));
-                        }
-                    }
-                }, null);
-        return promise;
-    }
-
-    @Override
-    public Future<DistributedLock> createReadLock(final LogMetadataForReader metadata,
-                                                  Optional<String> readerId) {
-        final String readLockPath = metadata.getReadLockPath(readerId);
-        return ensureReadLockPathExist(metadata, readLockPath).flatMap(
-                new ExceptionalFunction<Void, Future<DistributedLock>>() {
-            @Override
-            public Future<DistributedLock> applyE(Void value) throws Throwable {
-                // Unfortunately this has a blocking call which we should not execute on the
-                // ZK completion thread
-                return scheduler.apply(new ExceptionalFunction0<DistributedLock>() {
-                    @Override
-                    public DistributedLock applyE() throws Throwable {
-                        return new ZKDistributedLock(
-                            getLockStateExecutor(true),
-                            getLockFactory(true),
-                            readLockPath,
-                            conf.getLockTimeoutMilliSeconds(),
-                            statsLogger.scope("read_lock"));
-                    }
-                });
-            }
-        });
-    }
-
-    //
-    // Create Log
-    //
-
-    static class MetadataIndex {
-        static final int LOG_ROOT_PARENT = 0;
-        static final int LOG_ROOT = 1;
-        static final int MAX_TXID = 2;
-        static final int VERSION = 3;
-        static final int LOCK = 4;
-        static final int READ_LOCK = 5;
-        static final int LOGSEGMENTS = 6;
-        static final int ALLOCATION = 7;
-    }
-
-    static int bytesToInt(byte[] b) {
-        assert b.length >= 4;
-        return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
-    }
-
-    static byte[] intToBytes(int i) {
-        return new byte[]{
-            (byte) (i >> 24),
-            (byte) (i >> 16),
-            (byte) (i >> 8),
-            (byte) (i)};
-    }
-
-    static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
-                                                                 String logRootPath,
-                                                                 boolean ownAllocator) {
-        // Note re. persistent lock state initialization: the read lock persistent state (path) is
-        // initialized here but only used in the read handler. The reason is its more convenient and
-        // less error prone to manage all stream structure in one place.
-        final String logRootParentPath = new File(logRootPath).getParent();
-        final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
-        final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
-        final String lockPath = logRootPath + LOCK_PATH;
-        final String readLockPath = logRootPath + READ_LOCK_PATH;
-        final String versionPath = logRootPath + VERSION_PATH;
-        final String allocationPath = logRootPath + ALLOCATION_PATH;
-
-        int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
-        List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
-        checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
-        checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
-        checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
-        checkFutures.add(Utils.zkGetData(zk, versionPath, false));
-        checkFutures.add(Utils.zkGetData(zk, lockPath, false));
-        checkFutures.add(Utils.zkGetData(zk, readLockPath, false));
-        checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false));
-        if (ownAllocator) {
-            checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
-        }
-
-        return Future.collect(checkFutures);
-    }
-
-    static boolean pathExists(Versioned<byte[]> metadata) {
-        return null != metadata.getValue() && null != metadata.getVersion();
-    }
-
-    static void ensureMetadataExist(Versioned<byte[]> metadata) {
-        Preconditions.checkNotNull(metadata.getValue());
-        Preconditions.checkNotNull(metadata.getVersion());
-    }
-
-    static void createMissingMetadata(final ZooKeeper zk,
-                                      final String logRootPath,
-                                      final List<Versioned<byte[]>> metadatas,
-                                      final List<ACL> acl,
-                                      final boolean ownAllocator,
-                                      final boolean createIfNotExists,
-                                      final Promise<List<Versioned<byte[]>>> promise) {
-        final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size());
-        final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size());
-        CreateMode createMode = CreateMode.PERSISTENT;
-
-        // log root parent path
-        if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
-            pathsToCreate.add(null);
-        } else {
-            String logRootParentPath = new File(logRootPath).getParent();
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootParentPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
-        }
-
-        // log root path
-        if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
-            pathsToCreate.add(null);
-        } else {
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
-        }
-
-        // max id
-        if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) {
-            pathsToCreate.add(null);
-        } else {
-            byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L);
-            pathsToCreate.add(zeroTxnIdData);
-            zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode));
-        }
-        // version
-        if (pathExists(metadatas.get(MetadataIndex.VERSION))) {
-            pathsToCreate.add(null);
-        } else {
-            byte[] versionData = intToBytes(LAYOUT_VERSION);
-            pathsToCreate.add(versionData);
-            zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode));
-        }
-        // lock path
-        if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
-            pathsToCreate.add(null);
-        } else {
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootPath + LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
-        }
-        // read lock path
-        if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
-            pathsToCreate.add(null);
-        } else {
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
-        }
-        // log segments path
-        if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
-            pathsToCreate.add(null);
-        } else {
-            byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
-                    DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
-            pathsToCreate.add(logSegmentsData);
-            zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
-        }
-        // allocation path
-        if (ownAllocator) {
-            if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
-                pathsToCreate.add(null);
-            } else {
-                pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-                zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
-                        DistributedLogConstants.EMPTY_BYTES, acl, createMode));
-            }
-        }
-        if (zkOps.isEmpty()) {
-            // nothing missed
-            promise.setValue(metadatas);
-            return;
-        }
-        if (!createIfNotExists) {
-            promise.setException(new LogNotFoundException("Log " + logRootPath + " not found"));
-            return;
-        }
-
-        zk.multi(zkOps, new AsyncCallback.MultiCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
-                if (KeeperException.Code.OK.intValue() == rc) {
-                    List<Versioned<byte[]>> finalMetadatas =
-                            Lists.newArrayListWithExpectedSize(metadatas.size());
-                    for (int i = 0; i < pathsToCreate.size(); i++) {
-                        byte[] dataCreated = pathsToCreate.get(i);
-                        if (null == dataCreated) {
-                            finalMetadatas.add(metadatas.get(i));
-                        } else {
-                            finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0)));
-                        }
-                    }
-                    promise.setValue(finalMetadatas);
-                } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                    promise.setException(new LogExistsException("Someone just created log "
-                            + logRootPath));
-                } else {
-                    if (LOG.isDebugEnabled()) {
-                        StringBuilder builder = new StringBuilder();
-                        for (OpResult result : resultList) {
-                            if (result instanceof OpResult.ErrorResult) {
-                                OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result;
-                                builder.append(errorResult.getErr()).append(",");
-                            } else {
-                                builder.append(0).append(",");
-                            }
-                        }
-                        String resultCodeList = builder.substring(0, builder.length() - 1);
-                        LOG.debug("Failed to create log, full rc list = {}", resultCodeList);
-                    }
-
-                    promise.setException(new ZKException("Failed to create log " + logRootPath,
-                            KeeperException.Code.get(rc)));
-                }
-            }
-        }, null);
-    }
-
-    static LogMetadataForWriter processLogMetadatas(URI uri,
-                                                    String logName,
-                                                    String logIdentifier,
-                                                    List<Versioned<byte[]>> metadatas,
-                                                    boolean ownAllocator)
-            throws UnexpectedException {
-        try {
-            // max id
-            Versioned<byte[]> maxTxnIdData = metadatas.get(MetadataIndex.MAX_TXID);
-            ensureMetadataExist(maxTxnIdData);
-            // version
-            Versioned<byte[]> versionData = metadatas.get(MetadataIndex.VERSION);
-            ensureMetadataExist(maxTxnIdData);
-            Preconditions.checkArgument(LAYOUT_VERSION == bytesToInt(versionData.getValue()));
-            // lock path
-            ensureMetadataExist(metadatas.get(MetadataIndex.LOCK));
-            // read lock path
-            ensureMetadataExist(metadatas.get(MetadataIndex.READ_LOCK));
-            // max lssn
-            Versioned<byte[]> maxLSSNData = metadatas.get(MetadataIndex.LOGSEGMENTS);
-            ensureMetadataExist(maxLSSNData);
-            try {
-                DLUtils.deserializeLogSegmentSequenceNumber(maxLSSNData.getValue());
-            } catch (NumberFormatException nfe) {
-                throw new UnexpectedException("Invalid max sequence number found in log " + logName, nfe);
-            }
-            // allocation path
-            Versioned<byte[]>  allocationData;
-            if (ownAllocator) {
-                allocationData = metadatas.get(MetadataIndex.ALLOCATION);
-                ensureMetadataExist(allocationData);
-            } else {
-                allocationData = new Versioned<byte[]>(null, null);
-            }
-            return new LogMetadataForWriter(uri, logName, logIdentifier,
-                    maxLSSNData, maxTxnIdData, allocationData);
-        } catch (IllegalArgumentException iae) {
-            throw new UnexpectedException("Invalid log " + logName, iae);
-        } catch (NullPointerException npe) {
-            throw new UnexpectedException("Invalid log " + logName, npe);
-        }
-    }
-
-    static Future<LogMetadataForWriter> getLog(final URI uri,
-                                               final String logName,
-                                               final String logIdentifier,
-                                               final ZooKeeperClient zooKeeperClient,
-                                               final boolean ownAllocator,
-                                               final boolean createIfNotExists) {
-        final String logRootPath = LogMetadata.getLogRootPath(uri, logName, logIdentifier);
-        try {
-            PathUtils.validatePath(logRootPath);
-        } catch (IllegalArgumentException e) {
-            LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e});
-            return Future.exception(new InvalidStreamNameException(logName, "Log name is invalid"));
-        }
-
-        try {
-            final ZooKeeper zk = zooKeeperClient.get();
-            return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
-                    .flatMap(new AbstractFunction1<List<Versioned<byte[]>>, Future<List<Versioned<byte[]>>>>() {
-                        @Override
-                        public Future<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
-                            Promise<List<Versioned<byte[]>>> promise =
-                                    new Promise<List<Versioned<byte[]>>>();
-                            createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(),
-                                    ownAllocator, createIfNotExists, promise);
-                            return promise;
-                        }
-                    }).map(new ExceptionalFunction<List<Versioned<byte[]>>, LogMetadataForWriter>() {
-                        @Override
-                        public LogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException {
-                            return processLogMetadatas(
-                                    uri,
-                                    logName,
-                                    logIdentifier,
-                                    metadatas,
-                                    ownAllocator);
-                        }
-                    });
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            return Future.exception(new ZKException("Encountered zookeeper connection issue on creating log " + logName,
-                    KeeperException.Code.CONNECTIONLOSS));
-        } catch (InterruptedException e) {
-            return Future.exception(new DLInterruptedException("Interrupted on creating log " + logName, e));
-        }
-    }
-
-    @Override
-    public Future<LogMetadataForWriter> getLog(final URI uri,
-                                               final String logName,
-                                               final boolean ownAllocator,
-                                               final boolean createIfNotExists) {
-        return getLog(
-                uri,
-                logName,
-                conf.getUnpartitionedStreamName(),
-                zooKeeperClient,
-                ownAllocator,
-                createIfNotExists);
-    }
-
-    //
-    // Delete Log
-    //
-
-    @Override
-    public Future<Void> deleteLog(URI uri, final String logName) {
-        final Promise<Void> promise = new Promise<Void>();
-        try {
-            String streamPath = LogMetadata.getLogStreamPath(uri, logName);
-            ZKUtil.deleteRecursive(zooKeeperClient.get(), streamPath, new AsyncCallback.VoidCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx) {
-                    if (KeeperException.Code.OK.intValue() != rc) {
-                        FutureUtils.setException(promise,
-                                new ZKException("Encountered zookeeper issue on deleting log stream "
-                                        + logName, KeeperException.Code.get(rc)));
-                        return;
-                    }
-                    FutureUtils.setValue(promise, null);
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
-                    + logName, KeeperException.Code.CONNECTIONLOSS));
-        } catch (InterruptedException e) {
-            FutureUtils.setException(promise, new DLInterruptedException("Interrupted while deleting log stream "
-                    + logName));
-        } catch (KeeperException e) {
-            FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
-                    + logName, e));
-        }
-        return promise;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java
deleted file mode 100644
index 6b7a231..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.metadata;
-
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.metadata.DLMetadata;
-import com.twitter.distributedlog.metadata.MetadataResolver;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.data.Stat;
-
-import java.io.IOException;
-import java.net.URI;
-
-public class ZkMetadataResolver implements MetadataResolver {
-
-    private final ZooKeeperClient zkc;
-
-    public ZkMetadataResolver(ZooKeeperClient zkc) {
-        this.zkc = zkc;
-    }
-
-    @Override
-    public DLMetadata resolve(URI uri) throws IOException {
-        String dlPath = uri.getPath();
-        PathUtils.validatePath(dlPath);
-        // Normal case the dl metadata is stored in the last segment
-        // so lookup last segment first.
-        String[] parts = StringUtils.split(dlPath, '/');
-        if (null == parts || 0 == parts.length) {
-            throw new IOException("Invalid dlPath to resolve dl metadata : " + dlPath);
-        }
-        for (int i = parts.length; i >= 0; i--) {
-            String pathToResolve = String.format("/%s", StringUtils.join(parts, '/', 0, i));
-            byte[] data;
-            try {
-                data = zkc.get().getData(pathToResolve, false, new Stat());
-            } catch (KeeperException.NoNodeException nne) {
-                continue;
-            } catch (KeeperException ke) {
-                throw new IOException("Fail to resolve dl path : " + pathToResolve);
-            } catch (InterruptedException ie) {
-                throw new IOException("Interrupted when resolving dl path : " + pathToResolve);
-            }
-            if (null == data || data.length == 0) {
-                continue;
-            }
-            try {
-                return DLMetadata.deserialize(uri, data);
-            } catch (IOException ie) {
-                throw new IOException("Failed to deserialize uri : " + uri);
-            }
-        }
-        throw new IOException("No bkdl config bound under dl path : " + dlPath);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/package-info.java
deleted file mode 100644
index 7c5c2e4..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * The BookKeeper Based DistributedLog Implementation.
- */
-package com.twitter.distributedlog.impl;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
deleted file mode 100644
index b067ee9..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.subscription;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.twitter.distributedlog.subscription.SubscriptionStateStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
-
-import com.google.common.base.Charsets;
-
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-
-public class ZKSubscriptionStateStore implements SubscriptionStateStore {
-
-    static final Logger logger = LoggerFactory.getLogger(ZKSubscriptionStateStore.class);
-
-    private final ZooKeeperClient zooKeeperClient;
-    private final String zkPath;
-    private AtomicReference<DLSN> lastCommittedPosition = new AtomicReference<DLSN>(null);
-
-    public ZKSubscriptionStateStore(ZooKeeperClient zooKeeperClient, String zkPath) {
-        this.zooKeeperClient = zooKeeperClient;
-        this.zkPath = zkPath;
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    /**
-     * Get the last committed position stored for this subscription
-     */
-    @Override
-    public Future<DLSN> getLastCommitPosition() {
-        if (null != lastCommittedPosition.get()) {
-            return Future.value(lastCommittedPosition.get());
-        } else {
-            return getLastCommitPositionFromZK();
-        }
-    }
-
-    Future<DLSN> getLastCommitPositionFromZK() {
-        final Promise<DLSN> result = new Promise<DLSN>();
-        try {
-            logger.debug("Reading last commit position from path {}", zkPath);
-            zooKeeperClient.get().getData(zkPath, false, new AsyncCallback.DataCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-                    logger.debug("Read last commit position from path {}: rc = {}", zkPath, rc);
-                    if (KeeperException.Code.NONODE.intValue() == rc) {
-                        result.setValue(DLSN.NonInclusiveLowerBound);
-                    } else if (KeeperException.Code.OK.intValue() != rc) {
-                        result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
-                    } else {
-                        try {
-                            DLSN dlsn = DLSN.deserialize(new String(data, Charsets.UTF_8));
-                            result.setValue(dlsn);
-                        } catch (Exception t) {
-                            logger.warn("Invalid last commit position found from path {}", zkPath, t);
-                            // invalid dlsn recorded in subscription state store
-                            result.setValue(DLSN.NonInclusiveLowerBound);
-                        }
-                    }
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
-            result.setException(zkce);
-        } catch (InterruptedException ie) {
-            result.setException(new DLInterruptedException("getLastCommitPosition was interrupted", ie));
-        }
-        return result;
-    }
-
-    /**
-     * Advances the position associated with the subscriber
-     *
-     * @param newPosition - new commit position
-     */
-    @Override
-    public Future<BoxedUnit> advanceCommitPosition(DLSN newPosition) {
-        if (null == lastCommittedPosition.get() ||
-            (newPosition.compareTo(lastCommittedPosition.get()) > 0)) {
-            lastCommittedPosition.set(newPosition);
-            return Utils.zkAsyncCreateFullPathOptimisticAndSetData(zooKeeperClient,
-                zkPath, newPosition.serialize().getBytes(Charsets.UTF_8),
-                zooKeeperClient.getDefaultACL(),
-                CreateMode.PERSISTENT);
-        } else {
-            return Future.Done();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java
deleted file mode 100644
index 17ba943..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.subscription;
-
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.subscription.SubscriptionStateStore;
-import com.twitter.distributedlog.subscription.SubscriptionsStore;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * ZooKeeper Based Subscriptions Store.
- */
-public class ZKSubscriptionsStore implements SubscriptionsStore {
-
-    private final ZooKeeperClient zkc;
-    private final String zkPath;
-    private final ConcurrentMap<String, ZKSubscriptionStateStore> subscribers =
-            new ConcurrentHashMap<String, ZKSubscriptionStateStore>();
-
-    public ZKSubscriptionsStore(ZooKeeperClient zkc, String zkPath) {
-        this.zkc = zkc;
-        this.zkPath = zkPath;
-    }
-
-    private ZKSubscriptionStateStore getSubscriber(String subscriberId) {
-        ZKSubscriptionStateStore ss = subscribers.get(subscriberId);
-        if (ss == null) {
-            ZKSubscriptionStateStore newSS = new ZKSubscriptionStateStore(zkc,
-                getSubscriberZKPath(subscriberId));
-            ZKSubscriptionStateStore oldSS = subscribers.putIfAbsent(subscriberId, newSS);
-            if (oldSS == null) {
-                ss = newSS;
-            } else {
-                try {
-                    newSS.close();
-                } catch (IOException e) {
-                    // ignore the exception
-                }
-                ss = oldSS;
-            }
-        }
-        return ss;
-    }
-
-    private String getSubscriberZKPath(String subscriberId) {
-        return String.format("%s/%s", zkPath, subscriberId);
-    }
-
-    @Override
-    public Future<DLSN> getLastCommitPosition(String subscriberId) {
-        return getSubscriber(subscriberId).getLastCommitPosition();
-    }
-
-    @Override
-    public Future<Map<String, DLSN>> getLastCommitPositions() {
-        final Promise<Map<String, DLSN>> result = new Promise<Map<String, DLSN>>();
-        try {
-            this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
-                    if (KeeperException.Code.NONODE.intValue() == rc) {
-                        result.setValue(new HashMap<String, DLSN>());
-                    } else if (KeeperException.Code.OK.intValue() != rc) {
-                        result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
-                    } else {
-                        getLastCommitPositions(result, children);
-                    }
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
-            result.setException(zkce);
-        } catch (InterruptedException ie) {
-            result.setException(new DLInterruptedException("getLastCommitPositions was interrupted", ie));
-        }
-        return result;
-    }
-
-    private void getLastCommitPositions(final Promise<Map<String, DLSN>> result,
-                                        List<String> subscribers) {
-        List<Future<Pair<String, DLSN>>> futures =
-                new ArrayList<Future<Pair<String, DLSN>>>(subscribers.size());
-        for (String s : subscribers) {
-            final String subscriber = s;
-            Future<Pair<String, DLSN>> future =
-                // Get the last commit position from zookeeper
-                getSubscriber(subscriber).getLastCommitPositionFromZK().map(
-                        new AbstractFunction1<DLSN, Pair<String, DLSN>>() {
-                            @Override
-                            public Pair<String, DLSN> apply(DLSN dlsn) {
-                                return Pair.of(subscriber, dlsn);
-                            }
-                        });
-            futures.add(future);
-        }
-        Future.collect(futures).foreach(
-            new AbstractFunction1<List<Pair<String, DLSN>>, BoxedUnit>() {
-                @Override
-                public BoxedUnit apply(List<Pair<String, DLSN>> subscriptions) {
-                    Map<String, DLSN> subscriptionMap = new HashMap<String, DLSN>();
-                    for (Pair<String, DLSN> pair : subscriptions) {
-                        subscriptionMap.put(pair.getLeft(), pair.getRight());
-                    }
-                    result.setValue(subscriptionMap);
-                    return BoxedUnit.UNIT;
-                }
-            });
-    }
-
-    @Override
-    public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN newPosition) {
-        return getSubscriber(subscriberId).advanceCommitPosition(newPosition);
-    }
-
-    @Override
-    public Future<Boolean> deleteSubscriber(String subscriberId) {
-        subscribers.remove(subscriberId);
-        String path = getSubscriberZKPath(subscriberId);
-        return Utils.zkDeleteIfNotExist(zkc, path, new ZkVersion(-1));
-    }
-
-    @Override
-    public void close() throws IOException {
-        // no-op
-        for (SubscriptionStateStore store : subscribers.values()) {
-            store.close();
-        }
-    }
-
-}