You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:24 UTC

[19/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLockFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLockFactory.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLockFactory.java
deleted file mode 100644
index bb98e07..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLockFactory.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.lock;
-
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
-import org.apache.bookkeeper.stats.StatsLogger;
-import scala.runtime.BoxedUnit;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Factory to create zookeeper based locks.
- */
-public class ZKSessionLockFactory implements SessionLockFactory {
-
-    private final ZooKeeperClient zkc;
-    private final String clientId;
-    private final OrderedScheduler lockStateExecutor;
-    private final long lockOpTimeout;
-    private final int lockCreationRetries;
-    private final long zkRetryBackoffMs;
-
-    // Stats
-    private final StatsLogger lockStatsLogger;
-
-    public ZKSessionLockFactory(ZooKeeperClient zkc,
-                                String clientId,
-                                OrderedScheduler lockStateExecutor,
-                                int lockCreationRetries,
-                                long lockOpTimeout,
-                                long zkRetryBackoffMs,
-                                StatsLogger statsLogger) {
-        this.zkc = zkc;
-        this.clientId = clientId;
-        this.lockStateExecutor = lockStateExecutor;
-        this.lockCreationRetries = lockCreationRetries;
-        this.lockOpTimeout = lockOpTimeout;
-        this.zkRetryBackoffMs = zkRetryBackoffMs;
-
-        this.lockStatsLogger = statsLogger.scope("lock");
-    }
-
-    @Override
-    public Future<SessionLock> createLock(String lockPath,
-                                          DistributedLockContext context) {
-        AtomicInteger numRetries = new AtomicInteger(lockCreationRetries);
-        final AtomicReference<Throwable> interruptedException = new AtomicReference<Throwable>(null);
-        Promise<SessionLock> createPromise =
-                new Promise<SessionLock>(new com.twitter.util.Function<Throwable, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable t) {
-                interruptedException.set(t);
-                return BoxedUnit.UNIT;
-            }
-        });
-        createLock(
-                lockPath,
-                context,
-                interruptedException,
-                numRetries,
-                createPromise,
-                0L);
-        return createPromise;
-    }
-
-    void createLock(final String lockPath,
-                    final DistributedLockContext context,
-                    final AtomicReference<Throwable> interruptedException,
-                    final AtomicInteger numRetries,
-                    final Promise<SessionLock> createPromise,
-                    final long delayMs) {
-        lockStateExecutor.schedule(lockPath, new Runnable() {
-            @Override
-            public void run() {
-                if (null != interruptedException.get()) {
-                    createPromise.updateIfEmpty(new Throw<SessionLock>(interruptedException.get()));
-                    return;
-                }
-                try {
-                    SessionLock lock = new ZKSessionLock(
-                            zkc,
-                            lockPath,
-                            clientId,
-                            lockStateExecutor,
-                            lockOpTimeout,
-                            lockStatsLogger,
-                            context);
-                    createPromise.updateIfEmpty(new Return<SessionLock>(lock));
-                } catch (DLInterruptedException dlie) {
-                    // if the creation is interrupted, throw the exception without retrie.
-                    createPromise.updateIfEmpty(new Throw<SessionLock>(dlie));
-                    return;
-                } catch (IOException e) {
-                    if (numRetries.getAndDecrement() < 0) {
-                        createPromise.updateIfEmpty(new Throw<SessionLock>(e));
-                        return;
-                    }
-                    createLock(
-                            lockPath,
-                            context,
-                            interruptedException,
-                            numRetries,
-                            createPromise,
-                            zkRetryBackoffMs);
-                }
-            }
-        }, delayMs, TimeUnit.MILLISECONDS);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/package-info.java
deleted file mode 100644
index 02d905d..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Distributed locking mechanism in distributedlog
- */
-package com.twitter.distributedlog.lock;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
deleted file mode 100644
index 81eb5ed..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.annotations.Beta;
-import com.twitter.distributedlog.Entry;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.util.List;
-
-/**
- * An interface class to read the enveloped entry (serialized bytes of
- * {@link com.twitter.distributedlog.Entry}) from a log segment
- */
-@Beta
-public interface LogSegmentEntryReader extends AsyncCloseable {
-
-    interface StateChangeListener {
-
-        /**
-         * Notify when caught up on inprogress.
-         */
-        void onCaughtupOnInprogress();
-
-    }
-
-    /**
-     * Start the reader. The method to signal the implementation
-     * to start preparing the data for consumption {@link #readNext(int)}
-     */
-    void start();
-
-    /**
-     * Register the state change listener
-     *
-     * @param listener register the state change listener
-     * @return entry reader
-     */
-    LogSegmentEntryReader registerListener(StateChangeListener listener);
-
-    /**
-     * Unregister the state change listener
-     *
-     * @param listener register the state change listener
-     * @return entry reader
-     */
-    LogSegmentEntryReader unregisterListener(StateChangeListener listener);
-
-    /**
-     * Return the log segment metadata for this reader.
-     *
-     * @return the log segment metadata
-     */
-    LogSegmentMetadata getSegment();
-
-    /**
-     * Update the log segment each time when the metadata has changed.
-     *
-     * @param segment new metadata of the log segment.
-     */
-    void onLogSegmentMetadataUpdated(LogSegmentMetadata segment);
-
-    /**
-     * Read next <i>numEntries</i> entries from current log segment.
-     * <p>
-     * <i>numEntries</i> will be best-effort.
-     *
-     * @param numEntries num entries to read from current log segment
-     * @return A promise that when satisified will contain a non-empty list of entries with their content.
-     * @throws {@link com.twitter.distributedlog.exceptions.EndOfLogSegmentException} when
-     *          read entries beyond the end of a <i>closed</i> log segment.
-     */
-    Future<List<Entry.Reader>> readNext(int numEntries);
-
-    /**
-     * Return the last add confirmed entry id (LAC).
-     *
-     * @return the last add confirmed entry id.
-     */
-    long getLastAddConfirmed();
-
-    /**
-     * Is the reader reading beyond last add confirmed.
-     *
-     * @return true if the reader is reading beyond last add confirmed
-     */
-    boolean isBeyondLastAddConfirmed();
-
-    /**
-     * Has the log segment reader caught up with the inprogress log segment.
-     *
-     * @return true only if the log segment is inprogress and it is caught up, otherwise return false.
-     */
-    boolean hasCaughtUpOnInprogress();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
deleted file mode 100644
index bcf8129..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.annotations.Beta;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.metadata.LogMetadataForWriter;
-import com.twitter.distributedlog.util.Allocator;
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.util.Future;
-
-import java.io.IOException;
-
-/**
- * Log Segment Store to read log segments
- */
-@Beta
-public interface LogSegmentEntryStore {
-
-    /**
-     * Delete the actual log segment from the entry store.
-     *
-     * @param segment log segment metadata
-     * @return future represent the delete result
-     */
-    Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment);
-
-    /**
-     * Create a new log segment allocator for allocating log segment entry writers.
-     *
-     * @param metadata the metadata for the log stream
-     * @return future represent the log segment allocator
-     */
-    Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator(
-            LogMetadataForWriter metadata,
-            DynamicDistributedLogConfiguration dynConf) throws IOException;
-
-    /**
-     * Open the reader for reading data to the log <i>segment</i>.
-     *
-     * @param segment the log <i>segment</i> to read data from
-     * @param startEntryId the start entry id
-     * @return future represent the opened reader
-     */
-    Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
-                                             long startEntryId);
-
-    /**
-     * Open the reader for reading entries from a random access log <i>segment</i>.
-     *
-     * @param segment the log <i>segment</i> to read entries from
-     * @param fence the flag to fence log segment
-     * @return future represent the opened random access reader
-     */
-    Future<LogSegmentRandomAccessEntryReader> openRandomAccessReader(LogSegmentMetadata segment,
-                                                                     boolean fence);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java
deleted file mode 100644
index 8b7d9b2..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.annotations.Beta;
-import com.twitter.distributedlog.Entry;
-import com.twitter.distributedlog.util.Sizable;
-import org.apache.bookkeeper.client.AsyncCallback;
-
-/**
- * An interface class to write the enveloped entry (serialized bytes of
- * {@link Entry} into the log segment.
- *
- * <p>It is typically used by {@link LogSegmentWriter}.
- *
- * @see LogSegmentWriter
- *
- * TODO: The interface is leveraging bookkeeper's callback and status code now
- *       Consider making it more generic.
- */
-@Beta
-public interface LogSegmentEntryWriter extends Sizable {
-
-    /**
-     * Get the log segment id.
-     *
-     * @return log segment id.
-     */
-    long getLogSegmentId();
-
-    /**
-     * Close the entry writer.
-     */
-    void asyncClose(AsyncCallback.CloseCallback callback, Object ctx);
-
-    /**
-     * Async add entry to the log segment.
-     * <p>The implementation semantic follows
-     * {@link org.apache.bookkeeper.client.LedgerHandle#asyncAddEntry(
-     * byte[], int, int, AsyncCallback.AddCallback, Object)}
-     *
-     * @param data
-     *          data to add
-     * @param offset
-     *          offset in the data
-     * @param length
-     *          length of the data
-     * @param callback
-     *          callback
-     * @param ctx
-     *          ctx
-     * @see org.apache.bookkeeper.client.LedgerHandle#asyncAddEntry(
-     * byte[], int, int, AsyncCallback.AddCallback, Object)
-     */
-    void asyncAddEntry(byte[] data, int offset, int length,
-                       AsyncCallback.AddCallback callback, Object ctx);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentFilter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentFilter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentFilter.java
deleted file mode 100644
index f8bf183..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentFilter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import java.util.Collection;
-
-/**
- * Filter to filter log segments
- */
-public interface LogSegmentFilter {
-
-    public static final LogSegmentFilter DEFAULT_FILTER = new LogSegmentFilter() {
-        @Override
-        public Collection<String> filter(Collection<String> fullList) {
-            return fullList;
-        }
-    };
-
-    /**
-     * Filter the log segments from the full log segment list.
-     *
-     * @param fullList
-     *          full list of log segment names.
-     * @return filtered log segment names
-     */
-    Collection<String> filter(Collection<String> fullList);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java
deleted file mode 100644
index d4ca3ea..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.base.Ticker;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Cache the log segment metadata
- */
-public class LogSegmentMetadataCache implements RemovalListener<String, LogSegmentMetadata> {
-
-    private static final Logger logger = LoggerFactory.getLogger(LogSegmentMetadataCache.class);
-
-    private final Cache<String, LogSegmentMetadata> cache;
-    private final boolean isCacheEnabled;
-
-    public LogSegmentMetadataCache(DistributedLogConfiguration conf,
-                                   Ticker ticker) {
-        cache = CacheBuilder.newBuilder()
-                .concurrencyLevel(conf.getNumWorkerThreads())
-                .initialCapacity(1024)
-                .expireAfterAccess(conf.getLogSegmentCacheTTLMs(), TimeUnit.MILLISECONDS)
-                .maximumSize(conf.getLogSegmentCacheMaxSize())
-                .removalListener(this)
-                .ticker(ticker)
-                .recordStats()
-                .build();
-        this.isCacheEnabled = conf.isLogSegmentCacheEnabled();
-        logger.info("Log segment cache is enabled = {}", this.isCacheEnabled);
-    }
-
-    /**
-     * Add the log <i>segment</i> of <i>path</i> to the cache.
-     *
-     * @param path the path of the log segment
-     * @param segment log segment metadata
-     */
-    public void put(String path, LogSegmentMetadata segment) {
-        if (isCacheEnabled) {
-            cache.put(path, segment);
-        }
-    }
-
-    /**
-     * Invalid the cache entry associated with <i>path</i>.
-     *
-     * @param path the path of the log segment
-     */
-    public void invalidate(String path) {
-        if (isCacheEnabled) {
-            cache.invalidate(path);
-        }
-    }
-
-    /**
-     * Retrieve the log segment of <i>path</i> from the cache.
-     *
-     * @param path the path of the log segment.
-     * @return log segment metadata if exists, otherwise null.
-     */
-    public LogSegmentMetadata get(String path) {
-        return cache.getIfPresent(path);
-    }
-
-    @Override
-    public void onRemoval(RemovalNotification<String, LogSegmentMetadata> notification) {
-        if (notification.wasEvicted()) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Log segment of {} was evicted.", notification.getKey());
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
deleted file mode 100644
index dda76e5..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.annotations.Beta;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.callback.LogSegmentNamesListener;
-import com.twitter.distributedlog.metadata.LogMetadata;
-import com.twitter.distributedlog.metadata.LogMetadataForWriter;
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.distributedlog.util.Transaction.OpListener;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-
-import java.io.Closeable;
-import java.util.List;
-
-/**
- * Interface for log segment metadata store. All operations that modify log segments should
- * be executed under a {@link Transaction}.
- */
-@Beta
-public interface LogSegmentMetadataStore extends Closeable {
-
-    /**
-     * Start the transaction on changing log segment metadata store.
-     *
-     * @return transaction of the log segment metadata store.
-     */
-    Transaction<Object> transaction();
-
-    // The reason to keep storing log segment sequence number & log record transaction id
-    // in this log segment metadata store interface is to share the transaction that used
-    // to start/complete log segment. It is a bit hard to separate them out right now.
-
-    /**
-     * Store the maximum log segment sequence number on <code>path</code>.
-     *
-     * @param txn
-     *          transaction to execute for storing log segment sequence number.
-     * @param logMetadata
-     *          metadata of the log stream
-     * @param sequenceNumber
-     *          log segment sequence number to store
-     * @param listener
-     *          listener on the result to this operation
-     */
-    void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn,
-                                          LogMetadata logMetadata,
-                                          Versioned<Long> sequenceNumber,
-                                          OpListener<Version> listener);
-
-    /**
-     * Store the maximum transaction id for <code>path</code>
-     *
-     * @param txn
-     *          transaction to execute for storing transaction id
-     * @param logMetadata
-     *          metadata of the log stream
-     * @param transactionId
-     *          transaction id to store
-     * @param listener
-     *          listener on the result to this operation
-     */
-    void storeMaxTxnId(Transaction<Object> txn,
-                       LogMetadataForWriter logMetadata,
-                       Versioned<Long> transactionId,
-                       OpListener<Version> listener);
-
-    /**
-     * Create a log segment <code>segment</code> under transaction <code>txn</code>.
-     *
-     * NOTE: this operation shouldn't be a blocking call. and it shouldn't execute the operation
-     *       immediately. the operation should be executed via {@link Transaction#execute()}
-     *
-     * @param txn
-     *          transaction to execute for this operation
-     * @param segment
-     *          segment to create
-     * @param opListener
-     *          the listener on the operation result
-     */
-    void createLogSegment(Transaction<Object> txn,
-                          LogSegmentMetadata segment,
-                          OpListener<Void> opListener);
-
-    /**
-     * Delete a log segment <code>segment</code> under transaction <code>txn</code>.
-     *
-     * NOTE: this operation shouldn't be a blocking call. and it shouldn't execute the operation
-     *       immediately. the operation should be executed via {@link Transaction#execute()}
-     *
-     * @param txn
-     *          transaction to execute for this operation
-     * @param segment
-     *          segment to delete
-     */
-    void deleteLogSegment(Transaction<Object> txn,
-                          LogSegmentMetadata segment,
-                          OpListener<Void> opListener);
-
-    /**
-     * Update a log segment <code>segment</code> under transaction <code>txn</code>.
-     *
-     * NOTE: this operation shouldn't be a blocking call. and it shouldn't execute the operation
-     *       immediately. the operation should be executed via {@link Transaction#execute()}
-     *
-     * @param txn
-     *          transaction to execute for this operation
-     * @param segment
-     *          segment to update
-     */
-    void updateLogSegment(Transaction<Object> txn, LogSegmentMetadata segment);
-
-    /**
-     * Retrieve the log segment associated <code>path</code>.
-     *
-     * @param logSegmentPath
-     *          path to store log segment metadata
-     * @return future of the retrieved log segment metadata
-     */
-    Future<LogSegmentMetadata> getLogSegment(String logSegmentPath);
-
-    /**
-     * Retrieve the list of log segments under <code>logSegmentsPath</code> and register a <i>listener</i>
-     * for subsequent changes for the list of log segments.
-     *
-     * @param logSegmentsPath
-     *          path to store list of log segments
-     * @param listener
-     *          log segment listener on log segment changes
-     * @return future of the retrieved list of log segment names
-     */
-    Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath,
-                                                       LogSegmentNamesListener listener);
-
-    /**
-     * Unregister a log segment <code>listener</code> on log segment changes under <code>logSegmentsPath</code>.
-     *
-     * @param logSegmentsPath
-     *          log segments path
-     * @param listener
-     *          log segment listener on log segment changes
-     */
-    void unregisterLogSegmentListener(String logSegmentsPath,
-                                      LogSegmentNamesListener listener);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java
deleted file mode 100644
index 70472ca..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.twitter.distributedlog.Entry;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.util.List;
-
-/**
- * An interface class to read entries {@link com.twitter.distributedlog.Entry}
- * from a random access log segment.
- */
-public interface LogSegmentRandomAccessEntryReader extends AsyncCloseable {
-
-    /**
-     * Read entries [startEntryId, endEntryId] from a random access log segment.
-     *
-     * @param startEntryId start entry id
-     * @param endEntryId end entry id
-     * @return A promise that when satisfied will contain a list of entries of [startEntryId, endEntryId].
-     */
-    Future<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId);
-
-    /**
-     * Return the last add confirmed entry id (LAC).
-     *
-     * @return the last add confirmed entry id.
-     */
-    long getLastAddConfirmed();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentWriter.java
deleted file mode 100644
index a0b4610..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentWriter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.annotations.Beta;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.LogRecord;
-import com.twitter.distributedlog.exceptions.BKTransmitException;
-import com.twitter.distributedlog.exceptions.LockingException;
-import com.twitter.distributedlog.io.AsyncAbortable;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.io.IOException;
-
-/**
- * An interface class to write log records into a log segment.
- */
-@Beta
-public interface LogSegmentWriter extends AsyncCloseable, AsyncAbortable {
-
-    /**
-     * Get the unique log segment id.
-     *
-     * @return log segment id.
-     */
-    public long getLogSegmentId();
-
-    /**
-     * Write a log record to a log segment.
-     *
-     * @param record single log record
-     * @return a future representing write result. A {@link DLSN} is returned if write succeeds,
-     *         otherwise, exceptions are returned.
-     * @throws com.twitter.distributedlog.exceptions.LogRecordTooLongException if log record is too long
-     * @throws com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException on invalid enveloped entry
-     * @throws LockingException if failed to acquire lock for the writer
-     * @throws BKTransmitException if failed to transmit data to bk
-     * @throws com.twitter.distributedlog.exceptions.WriteException if failed to write to bk
-     */
-    public Future<DLSN> asyncWrite(LogRecord record);
-
-    /**
-     * This isn't a simple synchronous version of {@code asyncWrite}. It has different semantic.
-     * This method only writes data to the buffer and flushes buffer if needed.
-     *
-     * TODO: we should remove this method. when we rewrite synchronous writer based on asynchronous writer,
-     *       since this is the semantic needed to be provided in higher level but just calling write & flush.
-     *
-     * @param record single log record
-     * @throws IOException when tried to flush the buffer.
-     * @see LogSegmentWriter#asyncWrite(LogRecord)
-     */
-    public void write(LogRecord record) throws IOException;
-
-    /**
-     * Transmit the buffered data and wait for it being persisted and return the last acknowledged
-     * transaction id.
-     *
-     * @return future representing the transmit result with last acknowledged transaction id.
-     */
-    public Future<Long> flush();
-
-    /**
-     * Commit the current acknowledged data. It is the consequent operation of {@link #flush()},
-     * which makes all the acknowledged data visible to
-     *
-     * @return future representing the commit result.
-     */
-    public Future<Long> commit();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java
deleted file mode 100644
index 5f88c5a..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Managing log segments in local cache.
- *
- * <p>
- * Caching of log segment metadata assumes that the data contained in the ZNodes for individual
- * log segments is never updated after creation i.e we never call setData. A log segment
- * is finalized by creating a new ZNode and deleting the in progress node. This code will have
- * to change if we change the behavior
- * </p>
- */
-public class PerStreamLogSegmentCache {
-
-    static final Logger LOG = LoggerFactory.getLogger(PerStreamLogSegmentCache.class);
-
-    protected final String streamName;
-    protected final boolean validateLogSegmentSequenceNumber;
-    protected final Map<String, LogSegmentMetadata> logSegments =
-            new HashMap<String, LogSegmentMetadata>();
-    protected final ConcurrentMap<Long, LogSegmentMetadata> lid2LogSegments =
-            new ConcurrentHashMap<Long, LogSegmentMetadata>();
-
-    @VisibleForTesting
-    PerStreamLogSegmentCache(String streamName) {
-        this(streamName, true);
-    }
-
-    public PerStreamLogSegmentCache(String streamName,
-                                    boolean validateLogSegmentSequenceNumber) {
-        this.streamName = streamName;
-        this.validateLogSegmentSequenceNumber = validateLogSegmentSequenceNumber;
-    }
-
-    /**
-     * Retrieve log segments from the cache.
-     *
-     * - first sort the log segments in ascending order
-     * - do validation and assign corresponding sequence id
-     * - apply comparator after validation
-     *
-     * @param comparator
-     *          comparator to sort the returned log segments.
-     * @return list of sorted and filtered log segments.
-     * @throws UnexpectedException if unexpected condition detected (e.g. ledger sequence number gap)
-     */
-    public List<LogSegmentMetadata> getLogSegments(Comparator<LogSegmentMetadata> comparator)
-        throws UnexpectedException {
-        List<LogSegmentMetadata> segmentsToReturn;
-        synchronized (logSegments) {
-            segmentsToReturn = new ArrayList<LogSegmentMetadata>(logSegments.size());
-            segmentsToReturn.addAll(logSegments.values());
-        }
-        Collections.sort(segmentsToReturn, LogSegmentMetadata.COMPARATOR);
-
-        LogSegmentMetadata prevSegment = null;
-        if (validateLogSegmentSequenceNumber) {
-            // validation ledger sequence number to ensure the log segments are unique.
-            for (int i = 0; i < segmentsToReturn.size(); i++) {
-                LogSegmentMetadata segment = segmentsToReturn.get(i);
-
-                if (null != prevSegment
-                        && prevSegment.getVersion() >= LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value
-                        && segment.getVersion() >= LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value
-                        && prevSegment.getLogSegmentSequenceNumber() + 1 != segment.getLogSegmentSequenceNumber()) {
-                    LOG.error("{} found ledger sequence number gap between log segment {} and {}",
-                            new Object[] { streamName, prevSegment, segment });
-                    throw new UnexpectedException(streamName + " found ledger sequence number gap between log segment "
-                            + prevSegment.getLogSegmentSequenceNumber() + " and " + segment.getLogSegmentSequenceNumber());
-                }
-                prevSegment = segment;
-            }
-        }
-
-        prevSegment = null;
-        long startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID;
-        for (int i = 0; i < segmentsToReturn.size(); i++) {
-                LogSegmentMetadata segment = segmentsToReturn.get(i);
-            // assign sequence id
-            if (!segment.isInProgress()) {
-                if (segment.supportsSequenceId()) {
-                    startSequenceId = segment.getStartSequenceId() + segment.getRecordCount();
-                    if (null != prevSegment && prevSegment.supportsSequenceId()
-                            && prevSegment.getStartSequenceId() > segment.getStartSequenceId()) {
-                        LOG.warn("{} found decreasing start sequence id in log segment {}, previous is {}",
-                                new Object[] { streamName, segment, prevSegment });
-                    }
-                } else {
-                    startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID;
-                }
-            } else {
-                if (segment.supportsSequenceId()) {
-                    LogSegmentMetadata newSegment = segment.mutator()
-                            .setStartSequenceId(startSequenceId == DistributedLogConstants.UNASSIGNED_SEQUENCE_ID ? 0L : startSequenceId)
-                            .build();
-                    segmentsToReturn.set(i, newSegment);
-                }
-
-                break;
-            }
-            prevSegment = segment;
-        }
-        if (comparator != LogSegmentMetadata.COMPARATOR) {
-            Collections.sort(segmentsToReturn, comparator);
-        }
-        return segmentsToReturn;
-    }
-
-    /**
-     * Add the segment <i>metadata</i> for <i>name</i> in the cache.
-     *
-     * @param name
-     *          segment name.
-     * @param metadata
-     *          segment metadata.
-     */
-    public void add(String name, LogSegmentMetadata metadata) {
-        synchronized (logSegments) {
-            if (!logSegments.containsKey(name)) {
-                logSegments.put(name, metadata);
-                LOG.info("{} added log segment ({} : {}) to cache.",
-                        new Object[]{ streamName, name, metadata });
-            }
-            LogSegmentMetadata oldMetadata = lid2LogSegments.remove(metadata.getLogSegmentId());
-            if (null == oldMetadata) {
-                lid2LogSegments.put(metadata.getLogSegmentId(), metadata);
-            } else {
-                if (oldMetadata.isInProgress() && !metadata.isInProgress()) {
-                    lid2LogSegments.put(metadata.getLogSegmentId(), metadata);
-                } else {
-                    lid2LogSegments.put(oldMetadata.getLogSegmentId(), oldMetadata);
-                }
-            }
-        }
-    }
-
-    /**
-     * Retrieve log segment <code>name</code> from the cache.
-     *
-     * @param name
-     *          name of the log segment.
-     * @return log segment metadata
-     */
-    public LogSegmentMetadata get(String name) {
-        synchronized (logSegments) {
-            return logSegments.get(name);
-        }
-    }
-
-    /**
-     * Update the log segment cache with removed/added segments.
-     *
-     * @param segmentsRemoved
-     *          segments that removed
-     * @param segmentsAdded
-     *          segments that added
-     */
-    public void update(Set<String> segmentsRemoved,
-                       Map<String, LogSegmentMetadata> segmentsAdded) {
-        synchronized (logSegments) {
-            for (Map.Entry<String, LogSegmentMetadata> entry : segmentsAdded.entrySet()) {
-                add(entry.getKey(), entry.getValue());
-            }
-            for (String segment : segmentsRemoved) {
-                remove(segment);
-            }
-        }
-    }
-
-    /**
-     * Diff with new received segment list <code>segmentReceived</code>.
-     *
-     * @param segmentsReceived
-     *          new received segment list
-     * @return segments added (left) and removed (right).
-     */
-    public Pair<Set<String>, Set<String>> diff(Set<String> segmentsReceived) {
-        Set<String> segmentsAdded;
-        Set<String> segmentsRemoved;
-        synchronized (logSegments) {
-            Set<String> segmentsCached = logSegments.keySet();
-            segmentsAdded = Sets.difference(segmentsReceived, segmentsCached).immutableCopy();
-            segmentsRemoved = Sets.difference(segmentsCached, segmentsReceived).immutableCopy();
-        }
-        return Pair.of(segmentsAdded, segmentsRemoved);
-    }
-
-    /**
-     * Remove log segment <code>name</code> from the cache.
-     *
-     * @param name
-     *          name of the log segment.
-     * @return log segment metadata.
-     */
-    public LogSegmentMetadata remove(String name) {
-        synchronized (logSegments) {
-            LogSegmentMetadata metadata = logSegments.remove(name);
-            if (null != metadata) {
-                lid2LogSegments.remove(metadata.getLogSegmentId(), metadata);
-                LOG.debug("Removed log segment ({} : {}) from cache.", name, metadata);
-            }
-            return metadata;
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/RollingPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/RollingPolicy.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/RollingPolicy.java
deleted file mode 100644
index 0101bff..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/RollingPolicy.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.twitter.distributedlog.util.Sizable;
-
-public interface RollingPolicy {
-    /**
-     * Determines if a rollover may be appropriate at this time.
-     *
-     * @param sizable
-     *          Any object that is sizable.
-     * @param lastRolloverTimeMs
-     *          last rolling time in millis.
-     * @return true if a rollover is required. otherwise, false.
-     */
-    boolean shouldRollover(Sizable sizable, long lastRolloverTimeMs);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/SizeBasedRollingPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/SizeBasedRollingPolicy.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/SizeBasedRollingPolicy.java
deleted file mode 100644
index 8b1fa0f..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/SizeBasedRollingPolicy.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.twitter.distributedlog.util.Sizable;
-
-public class SizeBasedRollingPolicy implements RollingPolicy {
-
-    final long maxSize;
-
-    public SizeBasedRollingPolicy(long maxSize) {
-        this.maxSize = maxSize;
-    }
-
-    @Override
-    public boolean shouldRollover(Sizable sizable, long lastRolloverTimeMs) {
-        return sizable.size() > maxSize;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/TimeBasedRollingPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/TimeBasedRollingPolicy.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/TimeBasedRollingPolicy.java
deleted file mode 100644
index bc88720..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/TimeBasedRollingPolicy.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.twitter.distributedlog.util.Sizable;
-import com.twitter.distributedlog.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TimeBasedRollingPolicy implements RollingPolicy {
-
-    final static Logger LOG = LoggerFactory.getLogger(TimeBasedRollingPolicy.class);
-
-    final long rollingIntervalMs;
-
-    public TimeBasedRollingPolicy(long rollingIntervalMs) {
-        this.rollingIntervalMs = rollingIntervalMs;
-    }
-
-    @Override
-    public boolean shouldRollover(Sizable sizable, long lastRolloverTimeMs) {
-        long elapsedMs = Utils.elapsedMSec(lastRolloverTimeMs);
-        boolean shouldSwitch = elapsedMs > rollingIntervalMs;
-        if (shouldSwitch) {
-            LOG.debug("Last Finalize Time: {} elapsed time (MSec): {}", lastRolloverTimeMs,
-                      elapsedMs);
-        }
-        return shouldSwitch;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/package-info.java
deleted file mode 100644
index 0f5b877..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Log Segment Management
- */
-package com.twitter.distributedlog.logsegment;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLConfig.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLConfig.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLConfig.java
deleted file mode 100644
index 178074a..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLConfig.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import java.io.IOException;
-
-/**
- * Specific config of a given implementation of DL
- */
-public interface DLConfig {
-    /**
-     * Serialize the dl config into a string.
-     */
-    public String serialize();
-
-    /**
-     * Deserialize the dl config from a readable stream.
-     *
-     * @param data
-     *          bytes to desrialize dl config.
-     * @throws IOException if fail to deserialize the dl config string representation.
-     */
-    public void deserialize(byte[] data) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java
deleted file mode 100644
index c0b5fb7..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.impl.metadata.BKDLConfig;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.ZooKeeperClientBuilder;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-import java.net.URI;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * Metadata of a given DL instance.
- */
-public class DLMetadata {
-
-    static final Logger LOG = LoggerFactory.getLogger(DLMetadata.class);
-
-    static final String LINE_SPLITTER = "\n";
-    static final String BK_DL_TYPE = "BKDL";
-    static final int METADATA_FORMAT_VERSION = 1;
-
-    // metadata format version
-    private int metadataFormatVersion = 0;
-    // underlying dl type
-    private String dlType;
-    // underlying dl config
-    private DLConfig dlConfig;
-
-    public DLMetadata(String dlType, DLConfig dlConfig) {
-        this(dlType, dlConfig, METADATA_FORMAT_VERSION);
-    }
-
-    DLMetadata(String dlType, DLConfig dlConfig, int metadataFormatVersion) {
-        this.dlType = dlType;
-        this.dlConfig = dlConfig;
-        this.metadataFormatVersion = metadataFormatVersion;
-    }
-
-    /**
-     * @return DL type
-     */
-    public String getDLType() {
-        return dlType;
-    }
-
-    /**
-     * @return DL Config
-     */
-    public DLConfig getDLConfig() {
-        return dlConfig;
-    }
-
-    /**
-     * Serialize the DL metadata into bytes array.
-     *
-     * @return bytes of DL metadata.
-     */
-    public byte[] serialize() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(metadataFormatVersion).append(LINE_SPLITTER);
-        sb.append(dlType).append(LINE_SPLITTER);
-        sb.append(dlConfig.serialize());
-        LOG.debug("Serialized dl metadata {}.", sb);
-        return sb.toString().getBytes(UTF_8);
-    }
-
-    @Override
-    public int hashCode() {
-        return dlType.hashCode() * 13 + dlConfig.hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return new String(serialize(), UTF_8);
-    }
-
-    public void update(URI uri) throws IOException {
-        DistributedLogConfiguration conf = new DistributedLogConfiguration();
-        ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
-                .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
-                .retryThreadCount(conf.getZKClientNumberRetryThreads())
-                .requestRateLimit(conf.getZKRequestRateLimit())
-                .zkAclId(conf.getZkAclId())
-                .uri(uri)
-                .build();
-        byte[] data = serialize();
-        try {
-            zkc.get().setData(uri.getPath(), data, -1);
-        } catch (KeeperException e) {
-            throw new IOException("Fail to update dl metadata " + new String(data, UTF_8)
-                    + " to uri " + uri, e);
-        } catch (InterruptedException e) {
-            throw new IOException("Interrupted when updating dl metadata "
-                    + new String(data, UTF_8) + " to uri " + uri, e);
-        } finally {
-            zkc.close();
-        }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof DLMetadata)) {
-            return false;
-        }
-        DLMetadata other = (DLMetadata) o;
-        return dlType.equals(other.dlType) && dlConfig.equals(other.dlConfig);
-    }
-
-    public void create(URI uri) throws IOException {
-        DistributedLogConfiguration conf = new DistributedLogConfiguration();
-        ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
-                .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
-                .retryThreadCount(conf.getZKClientNumberRetryThreads())
-                .requestRateLimit(conf.getZKRequestRateLimit())
-                .zkAclId(conf.getZkAclId())
-                .uri(uri)
-                .build();
-        byte[] data = serialize();
-        try {
-            Utils.zkCreateFullPathOptimistic(zkc, uri.getPath(), data,
-                    zkc.getDefaultACL(), CreateMode.PERSISTENT);
-        } catch (KeeperException e) {
-            throw new IOException("Fail to write dl metadata " + new String(data, UTF_8)
-                    +  " to uri " + uri, e);
-        } catch (InterruptedException e) {
-            throw new IOException("Interrupted when writing dl metadata " + new String(data, UTF_8)
-                    + " to uri " + uri, e);
-        } finally {
-            zkc.close();
-        }
-    }
-
-    public static void unbind(URI uri) throws IOException {
-        DistributedLogConfiguration conf = new DistributedLogConfiguration();
-        ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
-                .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
-                .retryThreadCount(conf.getZKClientNumberRetryThreads())
-                .requestRateLimit(conf.getZKRequestRateLimit())
-                .zkAclId(conf.getZkAclId())
-                .uri(uri)
-                .build();
-        byte[] data = new byte[0];
-        try {
-            zkc.get().setData(uri.getPath(), data, -1);
-        } catch (KeeperException ke) {
-            throw new IOException("Fail to unbound dl metadata on uri " + uri, ke);
-        } catch (InterruptedException ie) {
-            throw new IOException("Interrupted when unbinding dl metadata on uri " + uri, ie);
-        } finally {
-            zkc.close();
-        }
-    }
-
-    /**
-     * Deserialize dl metadata of given <i>uri</i> from a given bytes array.
-     *
-     * @param uri
-     *          uri that stored dl metadata bindings
-     * @param data
-     *          bytes of dl metadata
-     * @return dl metadata
-     * @throws IOException if failed to parse the bytes array
-     */
-    public static DLMetadata deserialize(URI uri, byte[] data) throws IOException {
-        String metadata = new String(data, UTF_8);
-        LOG.debug("Parsing dl metadata {}.", metadata);
-        BufferedReader br = new BufferedReader(new StringReader(metadata));
-        String versionLine = br.readLine();
-        if (null == versionLine) {
-            throw new IOException("Empty DL Metadata.");
-        }
-        int version;
-        try {
-            version = Integer.parseInt(versionLine);
-        } catch (NumberFormatException nfe) {
-            version = -1;
-        }
-        if (METADATA_FORMAT_VERSION != version) {
-            throw new IOException("Metadata version not compatible. Expected "
-                    + METADATA_FORMAT_VERSION + " but got " + version);
-        }
-        String type = br.readLine();
-        if (!BK_DL_TYPE.equals(type)) {
-            throw new IOException("Invalid DL type : " + type);
-        }
-        DLConfig dlConfig = new BKDLConfig(uri);
-        StringBuilder sb = new StringBuilder();
-        String line;
-        while (null != (line = br.readLine())) {
-            sb.append(line);
-        }
-        dlConfig.deserialize(sb.toString().getBytes(UTF_8));
-        return new DLMetadata(type, dlConfig, version);
-    }
-
-    public static DLMetadata create(BKDLConfig bkdlConfig) {
-        return new DLMetadata(BK_DL_TYPE, bkdlConfig);
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java
deleted file mode 100644
index b2a417e..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.util.Future;
-
-public class DryrunLogSegmentMetadataStoreUpdater extends LogSegmentMetadataStoreUpdater {
-
-    public DryrunLogSegmentMetadataStoreUpdater(DistributedLogConfiguration conf,
-                                                LogSegmentMetadataStore metadataStore) {
-        super(conf, metadataStore);
-    }
-
-    @Override
-    public Transaction<Object> transaction() {
-        return new Transaction<Object>() {
-            @Override
-            public void addOp(Op<Object> operation) {
-                // no-op
-            }
-
-            @Override
-            public Future<Void> execute() {
-                return Future.Void();
-            }
-
-            @Override
-            public void abort(Throwable reason) {
-                // no-op
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java
deleted file mode 100644
index c878d68..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import java.net.URI;
-
-/**
- * Class to represent the layout and metadata of the zookeeper-based log metadata
- */
-public class LogMetadata {
-
-    protected static String getLogComponentPath(URI uri, String logName, String logIdentifier, String component) {
-        return String.format("%s/%s/%s%s", uri.getPath(), logName, logIdentifier, component);
-    }
-
-    /**
-     * Get the top stream path for a given log.
-     *
-     * @param uri namespace to store the log
-     * @param logName name of the log
-     * @return top stream path
-     */
-    public static String getLogStreamPath(URI uri, String logName) {
-        return String.format("%s/%s", uri.getPath(), logName);
-    }
-
-    /**
-     * Get the log root path for a given log.
-     *
-     * @param uri
-     *          namespace to store the log
-     * @param logName
-     *          name of the log
-     * @param logIdentifier
-     *          identifier of the log
-     * @return log root path
-     */
-    public static String getLogRootPath(URI uri, String logName, String logIdentifier) {
-        return getLogComponentPath(uri, logName, logIdentifier, "");
-    }
-
-    /**
-     * Get the logsegments root path for a given log.
-     *
-     * @param uri
-     *          namespace to store the log
-     * @param logName
-     *          name of the log
-     * @param logIdentifier
-     *          identifier of the log
-     * @return logsegments root path
-     */
-    public static String getLogSegmentsPath(URI uri, String logName, String logIdentifier) {
-        return getLogComponentPath(uri, logName, logIdentifier, LOGSEGMENTS_PATH);
-    }
-
-    public static final int LAYOUT_VERSION = -1;
-    public final static String LOGSEGMENTS_PATH = "/ledgers";
-    public final static String VERSION_PATH = "/version";
-    // writer znodes
-    public final static String MAX_TXID_PATH = "/maxtxid";
-    public final static String LOCK_PATH = "/lock";
-    public final static String ALLOCATION_PATH = "/allocation";
-    // reader znodes
-    public final static String READ_LOCK_PATH = "/readLock";
-
-    protected final URI uri;
-    protected final String logName;
-    protected final String logIdentifier;
-
-    // Root path of the log
-    protected final String logRootPath;
-    // Components
-    protected final String logSegmentsPath;
-    protected final String lockPath;
-    protected final String maxTxIdPath;
-    protected final String allocationPath;
-
-    /**
-     * metadata representation of a log
-     *
-     * @param uri
-     *          namespace to store the log
-     * @param logName
-     *          name of the log
-     * @param logIdentifier
-     *          identifier of the log
-     */
-    protected LogMetadata(URI uri,
-                          String logName,
-                          String logIdentifier) {
-        this.uri = uri;
-        this.logName = logName;
-        this.logIdentifier = logIdentifier;
-        this.logRootPath = getLogRootPath(uri, logName, logIdentifier);
-        this.logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
-        this.lockPath = logRootPath + LOCK_PATH;
-        this.maxTxIdPath = logRootPath + MAX_TXID_PATH;
-        this.allocationPath = logRootPath + ALLOCATION_PATH;
-    }
-
-    public URI getUri() {
-        return uri;
-    }
-
-    public String getLogName() {
-        return logName;
-    }
-
-    /**
-     * Get the root path of the log.
-     *
-     * @return root path of the log.
-     */
-    public String getLogRootPath() {
-        return logRootPath;
-    }
-
-    /**
-     * Get the root path for log segments.
-     *
-     * @return root path for log segments
-     */
-    public String getLogSegmentsPath() {
-        return this.logSegmentsPath;
-    }
-
-    /**
-     * Get the path for a log segment of the log.
-     *
-     * @param segmentName
-     *          segment name
-     * @return path for the log segment
-     */
-    public String getLogSegmentPath(String segmentName) {
-        return this.logSegmentsPath + "/" + segmentName;
-    }
-
-    public String getLockPath() {
-        return lockPath;
-    }
-
-    public String getMaxTxIdPath() {
-        return maxTxIdPath;
-    }
-
-    public String getAllocationPath() {
-        return allocationPath;
-    }
-
-    /**
-     * Get the fully qualified name of the log.
-     *
-     * @return fully qualified name
-     */
-    public String getFullyQualifiedName() {
-        return String.format("%s:%s", logName, logIdentifier);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java
deleted file mode 100644
index ff6bfca..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import com.google.common.base.Optional;
-
-import java.net.URI;
-
-/**
- * Log Metadata for Reader
- */
-public class LogMetadataForReader extends LogMetadata {
-
-    /**
-     * Get the root path to store subscription infos of a log.
-     *
-     * @param uri
-     *          namespace of the log
-     * @param logName
-     *          name of the log
-     * @param logIdentifier
-     *          identifier of the log
-     * @return subscribers root path
-     */
-    public static String getSubscribersPath(URI uri, String logName, String logIdentifier) {
-        return getLogComponentPath(uri, logName, logIdentifier, SUBSCRIBERS_PATH);
-    }
-
-    /**
-     * Get the path that stores subscription info for a <code>subscriberId</code> for a <code>log</code>.
-     *
-     * @param uri
-     *          namespace of the log
-     * @param logName
-     *          name of the log
-     * @param logIdentifier
-     *          identifier of the log
-     * @param subscriberId
-     *          subscriber id of the log
-     * @return subscriber's path
-     */
-    public static String getSubscriberPath(URI uri, String logName, String logIdentifier, String subscriberId) {
-        return String.format("%s/%s", getSubscribersPath(uri, logName, logIdentifier), subscriberId);
-    }
-
-    /**
-     * Create a metadata representation of a log for reader.
-     *
-     * @param uri
-     *          namespace to store the log
-     * @param logName
-     *          name of the log
-     * @param logIdentifier
-     *          identifier of the log
-     * @return metadata representation of a log for reader
-     */
-    public static LogMetadataForReader of(URI uri, String logName, String logIdentifier) {
-        return new LogMetadataForReader(uri, logName, logIdentifier);
-    }
-
-    final static String SUBSCRIBERS_PATH = "/subscribers";
-
-    /**
-     * metadata representation of a log
-     *
-     * @param uri           namespace to store the log
-     * @param logName       name of the log
-     * @param logIdentifier identifier of the log
-     */
-    private LogMetadataForReader(URI uri, String logName, String logIdentifier) {
-        super(uri, logName, logIdentifier);
-    }
-
-    /**
-     * Get the readlock path for the log or a subscriber of the log.
-     *
-     * @param subscriberId
-     *          subscriber id. it is optional.
-     * @return read lock path
-     */
-    public String getReadLockPath(Optional<String> subscriberId) {
-        if (subscriberId.isPresent()) {
-            return logRootPath + SUBSCRIBERS_PATH + "/" + subscriberId.get() + READ_LOCK_PATH;
-        } else {
-            return logRootPath + READ_LOCK_PATH;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java
deleted file mode 100644
index 2284cbb..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import org.apache.bookkeeper.versioning.Versioned;
-
-import java.net.URI;
-
-/**
- * Log Metadata for writer
- */
-public class LogMetadataForWriter extends LogMetadata {
-
-    private final Versioned<byte[]> maxLSSNData;
-    private final Versioned<byte[]> maxTxIdData;
-    private final Versioned<byte[]> allocationData;
-
-    /**
-     * metadata representation of a log
-     *
-     * @param uri           namespace to store the log
-     * @param logName       name of the log
-     * @param logIdentifier identifier of the log
-     */
-    public LogMetadataForWriter(URI uri,
-                                String logName,
-                                String logIdentifier,
-                                Versioned<byte[]> maxLSSNData,
-                                Versioned<byte[]> maxTxIdData,
-                                Versioned<byte[]> allocationData) {
-        super(uri, logName, logIdentifier);
-        this.maxLSSNData = maxLSSNData;
-        this.maxTxIdData = maxTxIdData;
-        this.allocationData = allocationData;
-    }
-
-    public Versioned<byte[]> getMaxLSSNData() {
-        return maxLSSNData;
-    }
-
-    public Versioned<byte[]> getMaxTxIdData() {
-        return maxTxIdData;
-    }
-
-    public Versioned<byte[]> getAllocationData() {
-        return allocationData;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataStore.java
deleted file mode 100644
index 01dccb7..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataStore.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Optional;
-import com.twitter.distributedlog.callback.NamespaceListener;
-import com.twitter.util.Future;
-
-import java.net.URI;
-import java.util.Iterator;
-
-/**
- * Interface for log metadata store.
- */
-@Beta
-public interface LogMetadataStore {
-
-    /**
-     * Create a stream and return it is namespace location.
-     *
-     * @param logName
-     *          name of the log
-     * @return namespace location that stores this stream.
-     */
-    Future<URI> createLog(String logName);
-
-    /**
-     * Get the location of the log.
-     *
-     * @param logName
-     *          name of the log
-     * @return namespace location that stores this stream.
-     */
-    Future<Optional<URI>> getLogLocation(String logName);
-
-    /**
-     * Retrieves logs from the namespace.
-     *
-     * @return iterator of logs of the namespace.
-     */
-    Future<Iterator<String>> getLogs();
-
-    /**
-     * Register a namespace listener on streams changes.
-     *
-     * @param listener
-     *          namespace listener
-     */
-    void registerNamespaceListener(NamespaceListener listener);
-}