You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:35 UTC

[30/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
deleted file mode 100644
index e798a0f..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.ACL;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-public class DistributedLogConstants {
-    public static final byte[] EMPTY_BYTES = new byte[0];
-    public static final String SCHEME_PREFIX = "distributedlog";
-    public static final String BACKEND_BK = "bk";
-    public static final long INVALID_TXID = -999;
-    public static final long EMPTY_LOGSEGMENT_TX_ID = -99;
-    public static final long MAX_TXID = Long.MAX_VALUE;
-    public static final long SMALL_LOGSEGMENT_THRESHOLD = 10;
-    public static final int LOGSEGMENT_NAME_VERSION = 1;
-    public static final int FUTURE_TIMEOUT_IMMEDIATE = 0;
-    public static final int FUTURE_TIMEOUT_INFINITE = -1;
-    public static final long LOCK_IMMEDIATE = FUTURE_TIMEOUT_IMMEDIATE;
-    public static final long LOCK_TIMEOUT_INFINITE = FUTURE_TIMEOUT_INFINITE;
-    public static final long LOCK_OP_TIMEOUT_DEFAULT = 120;
-    public static final long LOCK_REACQUIRE_TIMEOUT_DEFAULT = 120;
-    public static final String UNKNOWN_CLIENT_ID = "Unknown-ClientId";
-    public static final int LOCAL_REGION_ID = 0;
-    public static final long LOGSEGMENT_DEFAULT_STATUS = 0;
-    public static final long UNASSIGNED_LOGSEGMENT_SEQNO = 0;
-    public static final long UNASSIGNED_SEQUENCE_ID = -1L;
-    public static final long FIRST_LOGSEGMENT_SEQNO = 1;
-    public static final long UNRESOLVED_LEDGER_ID = -1;
-    public static final long LATENCY_WARN_THRESHOLD_IN_MILLIS = TimeUnit.SECONDS.toMillis(1);
-    public static final int DL_INTERRUPTED_EXCEPTION_RESULT_CODE = Integer.MIN_VALUE + 1;
-    public static final int ZK_CONNECTION_EXCEPTION_RESULT_CODE = Integer.MIN_VALUE + 2;
-
-    public static final String ALLOCATION_POOL_NODE = ".allocation_pool";
-    // log segment prefix
-    public static final String INPROGRESS_LOGSEGMENT_PREFIX = "inprogress";
-    public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs";
-    public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement";
-    static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
-    static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8);
-
-    // An ACL that gives all permissions to node creators and read permissions only to everyone else.
-    public static final List<ACL> EVERYONE_READ_CREATOR_ALL =
-        ImmutableList.<ACL>builder()
-            .addAll(Ids.CREATOR_ALL_ACL)
-            .addAll(Ids.READ_ACL_UNSAFE)
-            .build();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java
deleted file mode 100644
index 34cfb65..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.callback.LogSegmentListener;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.distributedlog.namespace.NamespaceDriver;
-import com.twitter.distributedlog.subscription.SubscriptionStateStore;
-import com.twitter.distributedlog.subscription.SubscriptionsStore;
-import com.twitter.util.Future;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * A DistributedLogManager is responsible for managing a single place of storing
- * edit logs. It may correspond to multiple files, a backup node, etc.
- * Even when the actual underlying storage is rolled, or failed and restored,
- * each conceptual place of storage corresponds to exactly one instance of
- * this class, which is created when the EditLog is first opened.
- */
-public interface DistributedLogManager extends AsyncCloseable, Closeable {
-
-    /**
-     * Get the name of the stream managed by this log manager
-     * @return streamName
-     */
-    public String getStreamName();
-
-    /**
-     * Get the namespace driver used by this manager.
-     *
-     * @return the namespace driver
-     */
-    public NamespaceDriver getNamespaceDriver();
-
-    /**
-     * Get log segments.
-     *
-     * @return log segments
-     * @throws IOException
-     */
-    public List<LogSegmentMetadata> getLogSegments() throws IOException;
-
-    /**
-     * Register <i>listener</i> on log segment updates of this stream.
-     *
-     * @param listener
-     *          listener to receive update log segment list.
-     */
-    public void registerListener(LogSegmentListener listener) throws IOException ;
-
-    /**
-     * Unregister <i>listener</i> on log segment updates from this stream.
-     *
-     * @param listener
-     *          listener to receive update log segment list.
-     */
-    public void unregisterListener(LogSegmentListener listener);
-
-    /**
-     * Open async log writer to write records to the log stream.
-     *
-     * @return result represents the open result
-     */
-    public Future<AsyncLogWriter> openAsyncLogWriter();
-
-    /**
-     * Begin writing to the log stream identified by the name
-     *
-     * @return the writer interface to generate log records
-     */
-    public LogWriter startLogSegmentNonPartitioned() throws IOException;
-
-    /**
-     * Begin writing to the log stream identified by the name
-     *
-     * @return the writer interface to generate log records
-     */
-    // @Deprecated
-    public AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException;
-
-    /**
-     * Begin appending to the end of the log stream which is being treated as a sequence of bytes
-     *
-     * @return the writer interface to generate log records
-     */
-    public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException;
-
-    /**
-     * Get a reader to read a log stream as a sequence of bytes
-     *
-     * @return the writer interface to generate log records
-     */
-    public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException;
-
-    /**
-     * Get the input stream starting with fromTxnId for the specified log
-     *
-     * @param fromTxnId - the first transaction id we want to read
-     * @return the stream starting with transaction fromTxnId
-     * @throws IOException if a stream cannot be found.
-     */
-    public LogReader getInputStream(long fromTxnId)
-        throws IOException;
-
-    public LogReader getInputStream(DLSN fromDLSN) throws IOException;
-
-    /**
-     * Open an async log reader to read records from a log starting from <code>fromTxnId</code>.
-     *
-     * @param fromTxnId
-     *          transaction id to start reading from
-     * @return async log reader
-     */
-    public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId);
-
-    /**
-     * Open an async log reader to read records from a log starting from <code>fromDLSN</code>
-     *
-     * @param fromDLSN
-     *          dlsn to start reading from
-     * @return async log reader
-     */
-    public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN);
-
-    // @Deprecated
-    public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException;
-
-    // @Deprecated
-    public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException;
-
-    public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN);
-
-    /**
-     * Get a log reader with lock starting from <i>fromDLSN</i> and using <i>subscriberId</i>.
-     * If two readers tried to open using same subscriberId, one would succeed, while the other
-     * will be blocked until it gets the lock.
-     *
-     * @param fromDLSN
-     *          start dlsn
-     * @param subscriberId
-     *          subscriber id
-     * @return async log reader
-     */
-    public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId);
-
-    /**
-     * Get a log reader using <i>subscriberId</i> with lock. The reader will start reading from
-     * its last commit position recorded in subscription store. If no last commit position found
-     * in subscription store, it would start reading from head of the stream.
-     *
-     * If the two readers tried to open using same subscriberId, one would succeed, while the other
-     * will be blocked until it gets the lock.
-     *
-     * @param subscriberId
-     *          subscriber id
-     * @return async log reader
-     */
-    public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId);
-
-    /**
-     * Get the {@link DLSN} of first log record whose transaction id is not less than <code>transactionId</code>.
-     *
-     * @param transactionId
-     *          transaction id
-     * @return dlsn of first log record whose transaction id is not less than transactionId.
-     */
-    public Future<DLSN> getDLSNNotLessThanTxId(long transactionId);
-
-    /**
-     * Get the last log record in the stream
-     *
-     * @return the last log record in the stream
-     * @throws IOException if a stream cannot be found.
-     */
-    public LogRecordWithDLSN getLastLogRecord()
-        throws IOException;
-
-    /**
-     * Get the earliest Transaction Id available in the log
-     *
-     * @return earliest transaction id
-     * @throws IOException
-     */
-    public long getFirstTxId() throws IOException;
-
-    /**
-     * Get Latest Transaction Id in the log
-     *
-     * @return latest transaction id
-     * @throws IOException
-     */
-    public long getLastTxId() throws IOException;
-
-    /**
-     * Get Latest DLSN in the log
-     *
-     * @return last dlsn
-     * @throws IOException
-     */
-    public DLSN getLastDLSN() throws IOException;
-
-    /**
-     * Get Latest log record with DLSN in the log - async
-     *
-     * @return latest log record with DLSN
-     */
-    public Future<LogRecordWithDLSN> getLastLogRecordAsync();
-
-    /**
-     * Get Latest Transaction Id in the log - async
-     *
-     * @return latest transaction id
-     */
-    public Future<Long> getLastTxIdAsync();
-
-    /**
-     * Get first DLSN in the log.
-     *
-     * @return first dlsn in the stream
-     */
-    public Future<DLSN> getFirstDLSNAsync();
-
-    /**
-     * Get Latest DLSN in the log - async
-     *
-     * @return latest transaction id
-     */
-    public Future<DLSN> getLastDLSNAsync();
-
-    /**
-     * Get the number of log records in the active portion of the log
-     * Any log segments that have already been truncated will not be included
-     *
-     * @return number of log records
-     * @throws IOException
-     */
-    public long getLogRecordCount() throws IOException;
-
-    /**
-     * Get the number of log records in the active portion of the log - async.
-     * Any log segments that have already been truncated will not be included
-     *
-     * @return future number of log records
-     * @throws IOException
-     */
-    public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN);
-
-    /**
-     * Run recovery on the log.
-     *
-     * @throws IOException
-     */
-    public void recover() throws IOException;
-
-    /**
-     * Check if an end of stream marker was added to the stream
-     * A stream with an end of stream marker cannot be appended to
-     *
-     * @return true if the marker was added to the stream, false otherwise
-     * @throws IOException
-     */
-    public boolean isEndOfStreamMarked() throws IOException;
-
-    /**
-     * Delete the log.
-     *
-     * @throws IOException if the deletion fails
-     */
-    public void delete() throws IOException;
-
-    /**
-     * The DistributedLogManager may archive/purge any logs for transactionId
-     * less than or equal to minImageTxId.
-     * This is to be used only when the client explicitly manages deletion. If
-     * the cleanup policy is based on sliding time window, then this method need
-     * not be called.
-     *
-     * @param minTxIdToKeep the earliest txid that must be retained
-     * @throws IOException if purging fails
-     */
-    public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
-
-    /**
-     * Get the subscriptions store provided by the distributedlog manager.
-     *
-     * @return subscriptions store manages subscriptions for current stream.
-     */
-    public SubscriptionsStore getSubscriptionsStore();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java
deleted file mode 100644
index bf315fc..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.twitter.distributedlog.exceptions.LogRecordTooLongException;
-import com.twitter.distributedlog.exceptions.WriteException;
-import com.twitter.distributedlog.io.CompressionCodec;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import javax.annotation.Nullable;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * A set of {@link LogRecord}s.
- */
-public class Entry {
-
-    /**
-     * Create a new log record set.
-     *
-     * @param logName
-     *          name of the log
-     * @param initialBufferSize
-     *          initial buffer size
-     * @param envelopeBeforeTransmit
-     *          if envelope the buffer before transmit
-     * @param codec
-     *          compression codec
-     * @param statsLogger
-     *          stats logger to receive stats
-     * @return writer to build a log record set.
-     */
-    public static Writer newEntry(
-            String logName,
-            int initialBufferSize,
-            boolean envelopeBeforeTransmit,
-            CompressionCodec.Type codec,
-            StatsLogger statsLogger) {
-        return new EnvelopedEntryWriter(
-                logName,
-                initialBufferSize,
-                envelopeBeforeTransmit,
-                codec,
-                statsLogger);
-    }
-
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    /**
-     * Build the record set object.
-     */
-    public static class Builder {
-
-        private long logSegmentSequenceNumber = -1;
-        private long entryId = -1;
-        private long startSequenceId = Long.MIN_VALUE;
-        private boolean envelopeEntry = true;
-        // input stream
-        private InputStream in = null;
-        // or bytes array
-        private byte[] data = null;
-        private int offset = -1;
-        private int length = -1;
-        private Optional<Long> txidToSkipTo = Optional.absent();
-        private Optional<DLSN> dlsnToSkipTo = Optional.absent();
-        private boolean deserializeRecordSet = true;
-
-        private Builder() {}
-
-        /**
-         * Reset the builder.
-         *
-         * @return builder
-         */
-        public Builder reset() {
-            logSegmentSequenceNumber = -1;
-            entryId = -1;
-            startSequenceId = Long.MIN_VALUE;
-            envelopeEntry = true;
-            // input stream
-            in = null;
-            // or bytes array
-            data = null;
-            offset = -1;
-            length = -1;
-            txidToSkipTo = Optional.absent();
-            dlsnToSkipTo = Optional.absent();
-            return this;
-        }
-
-        /**
-         * Set the segment info of the log segment that this record
-         * set belongs to.
-         *
-         * @param lssn
-         *          log segment sequence number
-         * @param startSequenceId
-         *          start sequence id of this log segment
-         * @return builder
-         */
-        public Builder setLogSegmentInfo(long lssn, long startSequenceId) {
-            this.logSegmentSequenceNumber = lssn;
-            this.startSequenceId = startSequenceId;
-            return this;
-        }
-
-        /**
-         * Set the entry id of this log record set.
-         *
-         * @param entryId
-         *          entry id assigned for this log record set.
-         * @return builder
-         */
-        public Builder setEntryId(long entryId) {
-            this.entryId = entryId;
-            return this;
-        }
-
-        /**
-         * Set whether this record set is enveloped or not.
-         *
-         * @param enabled
-         *          flag indicates whether this record set is enveloped or not.
-         * @return builder
-         */
-        public Builder setEnvelopeEntry(boolean enabled) {
-            this.envelopeEntry = enabled;
-            return this;
-        }
-
-        /**
-         * Set the serialized bytes data of this record set.
-         *
-         * @param data
-         *          serialized bytes data of this record set.
-         * @param offset
-         *          offset of the bytes data
-         * @param length
-         *          length of the bytes data
-         * @return builder
-         */
-        public Builder setData(byte[] data, int offset, int length) {
-            this.data = data;
-            this.offset = offset;
-            this.length = length;
-            return this;
-        }
-
-        /**
-         * Set the input stream of the serialized bytes data of this record set.
-         *
-         * @param in
-         *          input stream
-         * @return builder
-         */
-        public Builder setInputStream(InputStream in) {
-            this.in = in;
-            return this;
-        }
-
-        /**
-         * Set the record set starts from <code>dlsn</code>.
-         *
-         * @param dlsn
-         *          dlsn to skip to
-         * @return builder
-         */
-        public Builder skipTo(@Nullable DLSN dlsn) {
-            this.dlsnToSkipTo = Optional.fromNullable(dlsn);
-            return this;
-        }
-
-        /**
-         * Set the record set starts from <code>txid</code>.
-         *
-         * @param txid
-         *          txid to skip to
-         * @return builder
-         */
-        public Builder skipTo(long txid) {
-            this.txidToSkipTo = Optional.of(txid);
-            return this;
-        }
-
-        /**
-         * Enable/disable deserialize record set.
-         *
-         * @param enabled
-         *          flag to enable/disable dserialize record set.
-         * @return builder
-         */
-        public Builder deserializeRecordSet(boolean enabled) {
-            this.deserializeRecordSet = enabled;
-            return this;
-        }
-
-        public Entry build() {
-            Preconditions.checkNotNull(data, "Serialized data isn't provided");
-            Preconditions.checkArgument(offset >= 0 && length >= 0
-                    && (offset + length) <= data.length,
-                    "Invalid offset or length of serialized data");
-            return new Entry(
-                    logSegmentSequenceNumber,
-                    entryId,
-                    startSequenceId,
-                    envelopeEntry,
-                    deserializeRecordSet,
-                    data,
-                    offset,
-                    length,
-                    txidToSkipTo,
-                    dlsnToSkipTo);
-        }
-
-        public Entry.Reader buildReader() throws IOException {
-            Preconditions.checkArgument(data != null || in != null,
-                    "Serialized data or input stream isn't provided");
-            InputStream in;
-            if (null != this.in) {
-                in = this.in;
-            } else {
-                Preconditions.checkArgument(offset >= 0 && length >= 0
-                                && (offset + length) <= data.length,
-                        "Invalid offset or length of serialized data");
-                in = new ByteArrayInputStream(data, offset, length);
-            }
-            return new EnvelopedEntryReader(
-                    logSegmentSequenceNumber,
-                    entryId,
-                    startSequenceId,
-                    in,
-                    envelopeEntry,
-                    deserializeRecordSet,
-                    NullStatsLogger.INSTANCE);
-        }
-
-    }
-
-    private final long logSegmentSequenceNumber;
-    private final long entryId;
-    private final long startSequenceId;
-    private final boolean envelopedEntry;
-    private final boolean deserializeRecordSet;
-    private final byte[] data;
-    private final int offset;
-    private final int length;
-    private final Optional<Long> txidToSkipTo;
-    private final Optional<DLSN> dlsnToSkipTo;
-
-    private Entry(long logSegmentSequenceNumber,
-                  long entryId,
-                  long startSequenceId,
-                  boolean envelopedEntry,
-                  boolean deserializeRecordSet,
-                  byte[] data,
-                  int offset,
-                  int length,
-                  Optional<Long> txidToSkipTo,
-                  Optional<DLSN> dlsnToSkipTo) {
-        this.logSegmentSequenceNumber = logSegmentSequenceNumber;
-        this.entryId = entryId;
-        this.startSequenceId = startSequenceId;
-        this.envelopedEntry = envelopedEntry;
-        this.deserializeRecordSet = deserializeRecordSet;
-        this.data = data;
-        this.offset = offset;
-        this.length = length;
-        this.txidToSkipTo = txidToSkipTo;
-        this.dlsnToSkipTo = dlsnToSkipTo;
-    }
-
-    /**
-     * Get raw data of this record set.
-     *
-     * @return raw data representation of this record set.
-     */
-    public byte[] getRawData() {
-        return data;
-    }
-
-    /**
-     * Create reader to iterate over this record set.
-     *
-     * @return reader to iterate over this record set.
-     * @throws IOException if the record set is invalid record set.
-     */
-    public Reader reader() throws IOException {
-        InputStream in = new ByteArrayInputStream(data, offset, length);
-        Reader reader = new EnvelopedEntryReader(
-                logSegmentSequenceNumber,
-                entryId,
-                startSequenceId,
-                in,
-                envelopedEntry,
-                deserializeRecordSet,
-                NullStatsLogger.INSTANCE);
-        if (txidToSkipTo.isPresent()) {
-            reader.skipTo(txidToSkipTo.get());
-        }
-        if (dlsnToSkipTo.isPresent()) {
-            reader.skipTo(dlsnToSkipTo.get());
-        }
-        return reader;
-    }
-
-    /**
-     * Writer to append {@link LogRecord}s to {@link Entry}.
-     */
-    public interface Writer extends EntryBuffer {
-
-        /**
-         * Write a {@link LogRecord} to this record set.
-         *
-         * @param record
-         *          record to write
-         * @param transmitPromise
-         *          callback for transmit result. the promise is only
-         *          satisfied when this record set is transmitted.
-         * @throws LogRecordTooLongException if the record is too long
-         * @throws WriteException when encountered exception writing the record
-         */
-        void writeRecord(LogRecord record, Promise<DLSN> transmitPromise)
-                throws LogRecordTooLongException, WriteException;
-
-        /**
-         * Reset the writer to write records.
-         */
-        void reset();
-
-    }
-
-    /**
-     * Reader to read {@link LogRecord}s from this record set.
-     */
-    public interface Reader {
-
-        /**
-         * Get the log segment sequence number.
-         *
-         * @return the log segment sequence number.
-         */
-        long getLSSN();
-
-        /**
-         * Return the entry id.
-         *
-         * @return the entry id.
-         */
-        long getEntryId();
-
-        /**
-         * Read next log record from this record set.
-         *
-         * @return next log record from this record set.
-         */
-        LogRecordWithDLSN nextRecord() throws IOException;
-
-        /**
-         * Skip the reader to the record whose transaction id is <code>txId</code>.
-         *
-         * @param txId
-         *          transaction id to skip to.
-         * @return true if skip succeeds, otherwise false.
-         * @throws IOException
-         */
-        boolean skipTo(long txId) throws IOException;
-
-        /**
-         * Skip the reader to the record whose DLSN is <code>dlsn</code>.
-         *
-         * @param dlsn
-         *          DLSN to skip to.
-         * @return true if skip succeeds, otherwise false.
-         * @throws IOException
-         */
-        boolean skipTo(DLSN dlsn) throws IOException;
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryBuffer.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryBuffer.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryBuffer.java
deleted file mode 100644
index 394fbad..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryBuffer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException;
-import com.twitter.distributedlog.io.Buffer;
-import com.twitter.distributedlog.io.TransmitListener;
-
-import java.io.IOException;
-
-/**
- * Write representation of a {@link Entry}.
- * It is a buffer of log record set, used for transmission.
- */
-public interface EntryBuffer extends TransmitListener {
-
-    /**
-     * Return if this record set contains user records.
-     *
-     * @return true if this record set contains user records, otherwise
-     * return false.
-     */
-    boolean hasUserRecords();
-
-    /**
-     * Return number of records in current record set.
-     *
-     * @return number of records in current record set.
-     */
-    int getNumRecords();
-
-    /**
-     * Return number of bytes in current record set.
-     *
-     * @return number of bytes in current record set.
-     */
-    int getNumBytes();
-
-    /**
-     * Return max tx id in current record set.
-     *
-     * @return max tx id.
-     */
-    long getMaxTxId();
-
-    /**
-     * Get the buffer to transmit.
-     *
-     * @return the buffer to transmit.
-     * @throws InvalidEnvelopedEntryException if the record set buffer is invalid
-     * @throws IOException when encountered IOException during serialization
-     */
-    Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java
deleted file mode 100644
index 0a15d29..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-/**
- * The position of an entry, identified by log segment sequence number and entry id.
- */
-class EntryPosition {
-
-    private long lssn;
-    private long entryId;
-
-    EntryPosition(long lssn, long entryId) {
-        this.lssn = lssn;
-        this.entryId = entryId;
-    }
-
-    public synchronized long getLogSegmentSequenceNumber() {
-        return lssn;
-    }
-
-    public synchronized long getEntryId() {
-        return entryId;
-    }
-
-    public synchronized boolean advance(long lssn, long entryId) {
-        if (lssn == this.lssn) {
-            if (entryId <= this.entryId) {
-                return false;
-            }
-            this.entryId = entryId;
-            return true;
-        } else if (lssn > this.lssn) {
-            this.lssn = lssn;
-            this.entryId = entryId;
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("(").append(lssn).append(", ").append(entryId).append(")");
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java
deleted file mode 100644
index 55d3be9..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import com.google.common.base.Preconditions;
-
-import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import com.twitter.distributedlog.annotations.DistributedLogAnnotations.Compression;
-import com.twitter.distributedlog.io.CompressionCodec;
-import com.twitter.distributedlog.io.CompressionUtils;
-import com.twitter.distributedlog.util.BitMaskUtils;
-
-/**
- * An enveloped entry written to BookKeeper.
- *
- * Data type in brackets. Interpretation should be on the basis of data types and not individual
- * bytes to honor Endianness.
- *
- * Entry Structure:
- * ---------------
- * Bytes 0                                  : Version (Byte)
- * Bytes 1 - (DATA = 1+Header.length-1)     : Header (Integer)
- * Bytes DATA - DATA+3                      : Payload Length (Integer)
- * BYTES DATA+4 - DATA+4+payload.length-1   : Payload (Byte[])
- *
- * V1 Header Structure: // Offsets relative to the start of the header.
- * -------------------
- * Bytes 0 - 3                              : Flags (Integer)
- * Bytes 4 - 7                              : Original payload size before compression (Integer)
- *
- *      Flags: // 32 Bits
- *      -----
- *      0 ... 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
- *                                      |_|
- *                                       |
- *                               Compression Type
- *
- *      Compression Type: // 2 Bits (Least significant)
- *      ----------------
- *      00      : No Compression
- *      01      : LZ4 Compression
- *      10      : Unused
- *      11      : Unused
- */
-public class EnvelopedEntry {
-
-    public static final int VERSION_LENGTH = 1; // One byte long
-    public static final byte VERSION_ONE = 1;
-
-    public static final byte LOWEST_SUPPORTED_VERSION = VERSION_ONE;
-    public static final byte HIGHEST_SUPPORTED_VERSION = VERSION_ONE;
-    public static final byte CURRENT_VERSION = VERSION_ONE;
-
-    private final OpStatsLogger compressionStat;
-    private final OpStatsLogger decompressionStat;
-    private final Counter compressedEntryBytes;
-    private final Counter decompressedEntryBytes;
-    private final byte version;
-
-    private Header header = new Header();
-    private Payload payloadCompressed = new Payload();
-    private Payload payloadDecompressed = new Payload();
-
-    public EnvelopedEntry(byte version,
-                          StatsLogger statsLogger) throws InvalidEnvelopedEntryException {
-        Preconditions.checkNotNull(statsLogger);
-        if (version < LOWEST_SUPPORTED_VERSION || version > HIGHEST_SUPPORTED_VERSION) {
-            throw new InvalidEnvelopedEntryException("Invalid enveloped entry version " + version + ", expected to be in [ "
-                    + LOWEST_SUPPORTED_VERSION + " ~ " + HIGHEST_SUPPORTED_VERSION + " ]");
-        }
-        this.version = version;
-        this.compressionStat = statsLogger.getOpStatsLogger("compression_time");
-        this.decompressionStat = statsLogger.getOpStatsLogger("decompression_time");
-        this.compressedEntryBytes = statsLogger.getCounter("compressed_bytes");
-        this.decompressedEntryBytes = statsLogger.getCounter("decompressed_bytes");
-    }
-
-    /**
-     * @param statsLogger
-     *          Used for getting stats for (de)compression time
-     * @param compressionType
-     *          The compression type to use
-     * @param decompressed
-     *          The decompressed payload
-     *          NOTE: The size of the byte array passed as the decompressed payload can be larger
-     *                than the actual contents to be compressed.
-     */
-    public EnvelopedEntry(byte version,
-                          CompressionCodec.Type compressionType,
-                          byte[] decompressed,
-                          int length,
-                          StatsLogger statsLogger)
-            throws InvalidEnvelopedEntryException {
-        this(version, statsLogger);
-        Preconditions.checkNotNull(compressionType);
-        Preconditions.checkNotNull(decompressed);
-        Preconditions.checkArgument(length >= 0, "Invalid bytes length " + length);
-
-        this.header = new Header(compressionType, length);
-        this.payloadDecompressed = new Payload(length, decompressed);
-    }
-
-    private boolean isReady() {
-        return (header.ready && payloadDecompressed.ready);
-    }
-
-    @Compression
-    public void writeFully(DataOutputStream out) throws IOException {
-        Preconditions.checkNotNull(out);
-        if (!isReady()) {
-            throw new IOException("Entry not writable");
-        }
-        // Version
-        out.writeByte(version);
-        // Header
-        header.write(out);
-        // Compress
-        CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType);
-        byte[] compressed = codec.compress(
-                payloadDecompressed.payload,
-                0,
-                payloadDecompressed.length,
-                compressionStat);
-        this.payloadCompressed = new Payload(compressed.length, compressed);
-        this.compressedEntryBytes.add(payloadCompressed.length);
-        this.decompressedEntryBytes.add(payloadDecompressed.length);
-        payloadCompressed.write(out);
-    }
-
-    @Compression
-    public void readFully(DataInputStream in) throws IOException {
-        Preconditions.checkNotNull(in);
-        // Make sure we're reading the right versioned entry.
-        byte version = in.readByte();
-        if (version != this.version) {
-            throw new IOException(String.format("Version mismatch while reading. Received: %d," +
-                    " Required: %d", version, this.version));
-        }
-        header.read(in);
-        payloadCompressed.read(in);
-        // Decompress
-        CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType);
-        byte[] decompressed = codec.decompress(
-                payloadCompressed.payload,
-                0,
-                payloadCompressed.length,
-                header.decompressedSize,
-                decompressionStat);
-        this.payloadDecompressed = new Payload(decompressed.length, decompressed);
-        this.compressedEntryBytes.add(payloadCompressed.length);
-        this.decompressedEntryBytes.add(payloadDecompressed.length);
-    }
-
-    public byte[] getDecompressedPayload() throws IOException {
-        if (!isReady()) {
-            throw new IOException("Decompressed payload is not initialized");
-        }
-        return payloadDecompressed.payload;
-    }
-
-    public static class Header {
-        public static final int COMPRESSION_CODEC_MASK = 0x3;
-        public static final int COMPRESSION_CODEC_NONE = 0x0;
-        public static final int COMPRESSION_CODEC_LZ4 = 0x1;
-
-        private int flags = 0;
-        private int decompressedSize = 0;
-        private CompressionCodec.Type compressionType = CompressionCodec.Type.UNKNOWN;
-
-        // Whether this struct is ready for reading/writing.
-        private boolean ready = false;
-
-        // Used while reading.
-        public Header() {
-        }
-
-        public Header(CompressionCodec.Type compressionType,
-                      int decompressedSize) {
-            this.compressionType = compressionType;
-            this.decompressedSize = decompressedSize;
-            this.flags = 0;
-            switch (compressionType) {
-                case NONE:
-                    this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK,
-                                                        COMPRESSION_CODEC_NONE);
-                    break;
-                case LZ4:
-                    this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK,
-                                                        COMPRESSION_CODEC_LZ4);
-                    break;
-                default:
-                    throw new RuntimeException(String.format("Unknown Compression Type: %s",
-                                                             compressionType));
-            }
-            // This can now be written.
-            this.ready = true;
-        }
-
-        private void write(DataOutputStream out) throws IOException {
-            out.writeInt(flags);
-            out.writeInt(decompressedSize);
-        }
-
-        private void read(DataInputStream in) throws IOException {
-            this.flags = in.readInt();
-            int compressionType = (int) BitMaskUtils.get(flags, COMPRESSION_CODEC_MASK);
-            if (compressionType == COMPRESSION_CODEC_NONE) {
-                this.compressionType = CompressionCodec.Type.NONE;
-            } else if (compressionType == COMPRESSION_CODEC_LZ4) {
-                this.compressionType = CompressionCodec.Type.LZ4;
-            } else {
-                throw new IOException(String.format("Unsupported Compression Type: %s",
-                                                    compressionType));
-            }
-            this.decompressedSize = in.readInt();
-            // Values can now be read.
-            this.ready = true;
-        }
-    }
-
-    public static class Payload {
-        private int length = 0;
-        private byte[] payload = null;
-
-        // Whether this struct is ready for reading/writing.
-        private boolean ready = false;
-
-        // Used for reading
-        Payload() {
-        }
-
-        Payload(int length, byte[] payload) {
-            this.length = length;
-            this.payload = payload;
-            this.ready = true;
-        }
-
-        private void write(DataOutputStream out) throws IOException {
-            out.writeInt(length);
-            out.write(payload, 0, length);
-        }
-
-        private void read(DataInputStream in) throws IOException {
-            this.length = in.readInt();
-            this.payload = new byte[length];
-            in.readFully(payload);
-            this.ready = true;
-        }
-    }
-
-    /**
-     * Return an InputStream that reads from the provided InputStream, decompresses the data
-     * and returns a new InputStream wrapping the underlying payload.
-     *
-     * Note that src is modified by this call.
-     *
-     * @return
-     *      New Input stream with the underlying payload.
-     * @throws Exception
-     */
-    public static InputStream fromInputStream(InputStream src,
-                                              StatsLogger statsLogger) throws IOException {
-        src.mark(VERSION_LENGTH);
-        byte version = new DataInputStream(src).readByte();
-        src.reset();
-        EnvelopedEntry entry = new EnvelopedEntry(version, statsLogger);
-        entry.readFully(new DataInputStream(src));
-        return new ByteArrayInputStream(entry.getDecompressedPayload());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java
deleted file mode 100644
index 038bb18..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Record reader to read records from an enveloped entry buffer.
- */
-class EnvelopedEntryReader implements Entry.Reader, RecordStream {
-
-    private final long logSegmentSeqNo;
-    private final long entryId;
-    private final LogRecord.Reader reader;
-
-    // slot id
-    private long slotId = 0;
-
-    EnvelopedEntryReader(long logSegmentSeqNo,
-                         long entryId,
-                         long startSequenceId,
-                         InputStream in,
-                         boolean envelopedEntry,
-                         boolean deserializeRecordSet,
-                         StatsLogger statsLogger)
-            throws IOException {
-        this.logSegmentSeqNo = logSegmentSeqNo;
-        this.entryId = entryId;
-        InputStream src = in;
-        if (envelopedEntry) {
-            src = EnvelopedEntry.fromInputStream(in, statsLogger);
-        }
-        this.reader = new LogRecord.Reader(
-                this,
-                new DataInputStream(src),
-                startSequenceId,
-                deserializeRecordSet);
-    }
-
-    @Override
-    public long getLSSN() {
-        return logSegmentSeqNo;
-    }
-
-    @Override
-    public long getEntryId() {
-        return entryId;
-    }
-
-    @Override
-    public LogRecordWithDLSN nextRecord() throws IOException {
-        return reader.readOp();
-    }
-
-    @Override
-    public boolean skipTo(long txId) throws IOException {
-        return reader.skipTo(txId, true);
-    }
-
-    @Override
-    public boolean skipTo(DLSN dlsn) throws IOException {
-        return reader.skipTo(dlsn);
-    }
-
-    //
-    // Record Stream
-    //
-
-    @Override
-    public void advance(int numRecords) {
-        slotId += numRecords;
-    }
-
-    @Override
-    public DLSN getCurrentPosition() {
-        return new DLSN(logSegmentSeqNo, entryId, slotId);
-    }
-
-    @Override
-    public String getName() {
-        return "EnvelopedReader";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java
deleted file mode 100644
index 01a91ab..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.Entry.Writer;
-import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException;
-import com.twitter.distributedlog.exceptions.LogRecordTooLongException;
-import com.twitter.distributedlog.exceptions.WriteCancelledException;
-import com.twitter.distributedlog.exceptions.WriteException;
-import com.twitter.distributedlog.io.Buffer;
-import com.twitter.distributedlog.io.CompressionCodec;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-
-/**
- * {@link com.twitter.distributedlog.io.Buffer} based log record set writer.
- */
-class EnvelopedEntryWriter implements Writer {
-
-    static final Logger logger = LoggerFactory.getLogger(EnvelopedEntryWriter.class);
-
-    private static class WriteRequest {
-
-        private final int numRecords;
-        private final Promise<DLSN> promise;
-
-        WriteRequest(int numRecords, Promise<DLSN> promise) {
-            this.numRecords = numRecords;
-            this.promise = promise;
-        }
-
-    }
-
-    private final String logName;
-    private final Buffer buffer;
-    private final LogRecord.Writer writer;
-    private final List<WriteRequest> writeRequests;
-    private final boolean envelopeBeforeTransmit;
-    private final CompressionCodec.Type codec;
-    private final StatsLogger statsLogger;
-    private int count = 0;
-    private boolean hasUserData = false;
-    private long maxTxId = Long.MIN_VALUE;
-
-    EnvelopedEntryWriter(String logName,
-                         int initialBufferSize,
-                         boolean envelopeBeforeTransmit,
-                         CompressionCodec.Type codec,
-                         StatsLogger statsLogger) {
-        this.logName = logName;
-        this.buffer = new Buffer(initialBufferSize * 6 / 5);
-        this.writer = new LogRecord.Writer(new DataOutputStream(buffer));
-        this.writeRequests = new LinkedList<WriteRequest>();
-        this.envelopeBeforeTransmit = envelopeBeforeTransmit;
-        this.codec = codec;
-        this.statsLogger = statsLogger;
-    }
-
-    @Override
-    public synchronized void reset() {
-        cancelPromises(new WriteCancelledException(logName, "Record Set is reset"));
-        count = 0;
-        this.buffer.reset();
-    }
-
-    @Override
-    public synchronized void writeRecord(LogRecord record,
-                                         Promise<DLSN> transmitPromise)
-            throws LogRecordTooLongException, WriteException {
-        int logRecordSize = record.getPersistentSize();
-        if (logRecordSize > MAX_LOGRECORD_SIZE) {
-            throw new LogRecordTooLongException(
-                    "Log Record of size " + logRecordSize + " written when only "
-                            + MAX_LOGRECORD_SIZE + " is allowed");
-        }
-
-        try {
-            this.writer.writeOp(record);
-            int numRecords = 1;
-            if (!record.isControl()) {
-                hasUserData = true;
-            }
-            if (record.isRecordSet()) {
-                numRecords = LogRecordSet.numRecords(record);
-            }
-            count += numRecords;
-            writeRequests.add(new WriteRequest(numRecords, transmitPromise));
-            maxTxId = Math.max(maxTxId, record.getTransactionId());
-        } catch (IOException e) {
-            logger.error("Failed to append record to record set of {} : ",
-                    logName, e);
-            throw new WriteException(logName, "Failed to append record to record set of "
-                    + logName);
-        }
-    }
-
-    private synchronized void satisfyPromises(long lssn, long entryId) {
-        long nextSlotId = 0;
-        for (WriteRequest request : writeRequests) {
-            request.promise.setValue(new DLSN(lssn, entryId, nextSlotId));
-            nextSlotId += request.numRecords;
-        }
-        writeRequests.clear();
-    }
-
-    private synchronized void cancelPromises(Throwable reason) {
-        for (WriteRequest request : writeRequests) {
-            request.promise.setException(reason);
-        }
-        writeRequests.clear();
-    }
-
-    @Override
-    public synchronized long getMaxTxId() {
-        return maxTxId;
-    }
-
-    @Override
-    public synchronized boolean hasUserRecords() {
-        return hasUserData;
-    }
-
-    @Override
-    public int getNumBytes() {
-        return buffer.size();
-    }
-
-    @Override
-    public synchronized int getNumRecords() {
-        return count;
-    }
-
-    @Override
-    public synchronized Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException {
-        if (!envelopeBeforeTransmit) {
-            return buffer;
-        }
-        // We can't escape this allocation because things need to be read from one byte array
-        // and then written to another. This is the destination.
-        Buffer toSend = new Buffer(buffer.size());
-        byte[] decompressed = buffer.getData();
-        int length = buffer.size();
-        EnvelopedEntry entry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
-                                                  codec,
-                                                  decompressed,
-                                                  length,
-                                                  statsLogger);
-        // This will cause an allocation of a byte[] for compression. This can be avoided
-        // but we can do that later only if needed.
-        entry.writeFully(new DataOutputStream(toSend));
-        return toSend;
-    }
-
-    @Override
-    public synchronized DLSN finalizeTransmit(long lssn, long entryId) {
-        return new DLSN(lssn, entryId, count - 1);
-    }
-
-    @Override
-    public void completeTransmit(long lssn, long entryId) {
-        satisfyPromises(lssn, entryId);
-    }
-
-    @Override
-    public void abortTransmit(Throwable reason) {
-        cancelPromises(reason);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerReadPosition.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerReadPosition.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerReadPosition.java
deleted file mode 100644
index 550d314..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerReadPosition.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import java.io.Serializable;
-import java.util.Comparator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LedgerReadPosition {
-    static final Logger LOG = LoggerFactory.getLogger(LedgerReadPosition.class);
-
-    private static enum PartialOrderingComparisonResult {
-        NotComparable,
-        GreaterThan,
-        LessThan,
-        EqualTo
-    }
-
-    long ledgerId = DistributedLogConstants.UNRESOLVED_LEDGER_ID;
-    long logSegmentSequenceNo;
-    long entryId;
-
-    public LedgerReadPosition(long ledgerId, long logSegmentSequenceNo, long entryId) {
-        this.ledgerId = ledgerId;
-        this.logSegmentSequenceNo = logSegmentSequenceNo;
-        this.entryId = entryId;
-    }
-
-    public LedgerReadPosition(LedgerReadPosition that) {
-        this.ledgerId = that.ledgerId;
-        this.logSegmentSequenceNo = that.logSegmentSequenceNo;
-        this.entryId = that.entryId;
-    }
-
-
-    public LedgerReadPosition(final DLSN dlsn) {
-        this(dlsn.getLogSegmentSequenceNo(), dlsn.getEntryId());
-    }
-
-    public LedgerReadPosition(long logSegmentSequenceNo, long entryId) {
-        this.logSegmentSequenceNo = logSegmentSequenceNo;
-        this.entryId = entryId;
-    }
-
-    public long getLedgerId() {
-        if (DistributedLogConstants.UNRESOLVED_LEDGER_ID == ledgerId) {
-            LOG.trace("Ledger Id is not initialized");
-            throw new IllegalStateException("Ledger Id is not initialized");
-        }
-        return ledgerId;
-    }
-
-    public long getLogSegmentSequenceNumber() {
-        return logSegmentSequenceNo;
-    }
-
-    public long getEntryId() {
-        return entryId;
-    }
-
-    public void advance() {
-        entryId++;
-    }
-
-    public void positionOnNewLogSegment(long ledgerId, long logSegmentSequenceNo) {
-        this.ledgerId = ledgerId;
-        this.logSegmentSequenceNo = logSegmentSequenceNo;
-        this.entryId = 0L;
-    }
-
-    @Override
-    public String toString() {
-        return String.format("(lid=%d, lseqNo=%d, eid=%d)", ledgerId, logSegmentSequenceNo, entryId);
-    }
-
-    public boolean definitelyLessThanOrEqualTo(LedgerReadPosition threshold) {
-        PartialOrderingComparisonResult result = comparePartiallyOrdered(threshold);
-        return ((result == PartialOrderingComparisonResult.LessThan) ||
-            (result == PartialOrderingComparisonResult.EqualTo));
-    }
-
-    public boolean definitelyLessThan(LedgerReadPosition threshold) {
-        PartialOrderingComparisonResult result = comparePartiallyOrdered(threshold);
-        return result == PartialOrderingComparisonResult.LessThan;
-    }
-
-    private PartialOrderingComparisonResult comparePartiallyOrdered(LedgerReadPosition threshold) {
-        // If no threshold is passed we cannot make a definitive comparison
-        if (null == threshold) {
-            return PartialOrderingComparisonResult.NotComparable;
-        }
-
-        if (this.logSegmentSequenceNo != threshold.logSegmentSequenceNo) {
-            if (this.logSegmentSequenceNo < threshold.logSegmentSequenceNo) {
-                return PartialOrderingComparisonResult.LessThan;
-            } else {
-                return PartialOrderingComparisonResult.GreaterThan;
-            }
-        } else if (this.ledgerId != threshold.ledgerId) {
-            // When logSegmentSequenceNo is equal we cannot definitely say that this
-            // position is less than the threshold unless ledgerIds are equal
-            // since LogSegmentSequenceNumber maybe inferred from transactionIds in older
-            // versions of the metadata.
-            return PartialOrderingComparisonResult.NotComparable;
-        } else if (this.getEntryId() < threshold.getEntryId()) {
-            return PartialOrderingComparisonResult.LessThan;
-        } else if (this.getEntryId() > threshold.getEntryId()) {
-            return PartialOrderingComparisonResult.GreaterThan;
-        } else {
-            return PartialOrderingComparisonResult.EqualTo;
-        }
-    }
-
-    /**
-     * Comparator for the key portion
-     */
-    public static final ReadAheadCacheKeyComparator COMPARATOR = new ReadAheadCacheKeyComparator();
-
-    // Only compares the key portion
-    @Override
-    public boolean equals(Object other) {
-        if (!(other instanceof LedgerReadPosition)) {
-            return false;
-        }
-        LedgerReadPosition key = (LedgerReadPosition) other;
-        return ledgerId == key.ledgerId &&
-            entryId == key.entryId;
-    }
-
-    @Override
-    public int hashCode() {
-        return (int) (ledgerId * 13 ^ entryId * 17);
-    }
-
-    /**
-     * Compare EntryKey.
-     */
-    protected static class ReadAheadCacheKeyComparator implements Comparator<LedgerReadPosition>, Serializable {
-
-        private static final long serialVersionUID = 0L;
-
-        @Override
-        public int compare(LedgerReadPosition left, LedgerReadPosition right) {
-            long ret = left.ledgerId - right.ledgerId;
-            if (ret == 0) {
-                ret = left.entryId - right.entryId;
-            }
-            return (ret < 0) ? -1 : ((ret > 0) ? 1 : 0);
-        }
-    }
-
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java
deleted file mode 100644
index f4a1e41..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Optional;
-import com.twitter.distributedlog.impl.metadata.BKDLConfig;
-import com.twitter.distributedlog.metadata.DLMetadata;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
-import org.apache.bookkeeper.util.IOUtils;
-import org.apache.bookkeeper.util.LocalBookKeeper;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.BindException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Utility class for setting up bookkeeper ensembles
- * and bringing individual bookies up and down
- */
-public class LocalDLMEmulator {
-    private static final Logger LOG = LoggerFactory.getLogger(LocalDLMEmulator.class);
-
-    public static final String DLOG_NAMESPACE = "/messaging/distributedlog";
-
-    private static final int DEFAULT_BOOKIE_INITIAL_PORT = 0; // Use ephemeral ports
-    private static final int DEFAULT_ZK_TIMEOUT_SEC = 10;
-    private static final int DEFAULT_ZK_PORT = 2181;
-    private static final String DEFAULT_ZK_HOST = "127.0.0.1";
-    private static final String DEFAULT_ZK_ENSEMBLE = DEFAULT_ZK_HOST + ":" + DEFAULT_ZK_PORT;
-    private static final int DEFAULT_NUM_BOOKIES = 3;
-    private static final ServerConfiguration DEFAULT_SERVER_CONFIGURATION = new ServerConfiguration();
-
-    private final String zkEnsemble;
-    private final URI uri;
-    private final List<File> tmpDirs = new ArrayList<File>();
-    private final int zkTimeoutSec;
-    private final Thread bkStartupThread;
-    private final String zkHost;
-    private final int zkPort;
-    private final int numBookies;
-
-    public static class Builder {
-        private int zkTimeoutSec = DEFAULT_ZK_TIMEOUT_SEC;
-        private int numBookies = DEFAULT_NUM_BOOKIES;
-        private String zkHost = DEFAULT_ZK_HOST;
-        private int zkPort = DEFAULT_ZK_PORT;
-        private int initialBookiePort = DEFAULT_BOOKIE_INITIAL_PORT;
-        private boolean shouldStartZK = true;
-        private Optional<ServerConfiguration> serverConf = Optional.absent();
-
-        public Builder numBookies(int numBookies) {
-            this.numBookies = numBookies;
-            return this;
-        }
-        public Builder zkHost(String zkHost) {
-            this.zkHost = zkHost;
-            return this;
-        }
-        public Builder zkPort(int zkPort) {
-            this.zkPort = zkPort;
-            return this;
-        }
-        public Builder zkTimeoutSec(int zkTimeoutSec) {
-            this.zkTimeoutSec = zkTimeoutSec;
-            return this;
-        }
-        public Builder initialBookiePort(int initialBookiePort) {
-            this.initialBookiePort = initialBookiePort;
-            return this;
-        }
-        public Builder shouldStartZK(boolean shouldStartZK) {
-            this.shouldStartZK = shouldStartZK;
-            return this;
-        }
-        public Builder serverConf(ServerConfiguration serverConf) {
-            this.serverConf = Optional.of(serverConf);
-            return this;
-        }
-
-        public LocalDLMEmulator build() throws Exception {
-            ServerConfiguration conf = null;
-            if (serverConf.isPresent()) {
-                conf = serverConf.get();
-            } else {
-                conf = (ServerConfiguration) DEFAULT_SERVER_CONFIGURATION.clone();
-                conf.setZkTimeout(zkTimeoutSec * 1000);
-            }
-            ServerConfiguration newConf = new ServerConfiguration();
-            newConf.loadConf(conf);
-            newConf.setAllowLoopback(true);
-
-            return new LocalDLMEmulator(numBookies, shouldStartZK, zkHost, zkPort,
-                initialBookiePort, zkTimeoutSec, newConf);
-        }
-    }
-
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    private LocalDLMEmulator(final int numBookies, final boolean shouldStartZK, final String zkHost, final int zkPort, final int initialBookiePort, final int zkTimeoutSec, final ServerConfiguration serverConf) throws Exception {
-        this.numBookies = numBookies;
-        this.zkHost = zkHost;
-        this.zkPort = zkPort;
-        this.zkEnsemble = zkHost + ":" + zkPort;
-        this.uri = URI.create("distributedlog://" + zkEnsemble + DLOG_NAMESPACE);
-        this.zkTimeoutSec = zkTimeoutSec;
-        this.bkStartupThread = new Thread() {
-            public void run() {
-                try {
-                    LOG.info("Starting {} bookies : allowLoopback = {}", numBookies, serverConf.getAllowLoopback());
-                    LocalBookKeeper.startLocalBookies(zkHost, zkPort, numBookies, shouldStartZK, initialBookiePort, serverConf);
-                    LOG.info("{} bookies are started.");
-                } catch (InterruptedException e) {
-                    // go away quietly
-                } catch (Exception e) {
-                    LOG.error("Error starting local bk", e);
-                }
-            }
-        };
-    }
-
-    public void start() throws Exception {
-        bkStartupThread.start();
-        if (!LocalBookKeeper.waitForServerUp(zkEnsemble, zkTimeoutSec*1000)) {
-            throw new Exception("Error starting zookeeper/bookkeeper");
-        }
-        int bookiesUp = checkBookiesUp(numBookies, zkTimeoutSec);
-        assert (numBookies == bookiesUp);
-        // Provision "/messaging/distributedlog" namespace
-        DLMetadata.create(new BKDLConfig(zkEnsemble, "/ledgers")).create(uri);
-    }
-
-    public void teardown() throws Exception {
-        if (bkStartupThread != null) {
-            bkStartupThread.interrupt();
-            bkStartupThread.join();
-        }
-        for (File dir : tmpDirs) {
-            FileUtils.deleteDirectory(dir);
-        }
-    }
-
-    public String getZkServers() {
-        return zkEnsemble;
-    }
-
-    public URI getUri() {
-        return uri;
-    }
-
-    public BookieServer newBookie() throws Exception {
-        ServerConfiguration bookieConf = new ServerConfiguration();
-        bookieConf.setZkTimeout(zkTimeoutSec * 1000);
-        bookieConf.setBookiePort(0);
-        bookieConf.setAllowLoopback(true);
-        File tmpdir = File.createTempFile("bookie" + UUID.randomUUID() + "_",
-            "test");
-        if (!tmpdir.delete()) {
-            LOG.debug("Fail to delete tmpdir " + tmpdir);
-        }
-        if (!tmpdir.mkdir()) {
-            throw new IOException("Fail to create tmpdir " + tmpdir);
-        }
-        tmpDirs.add(tmpdir);
-
-        bookieConf.setZkServers(zkEnsemble);
-        bookieConf.setJournalDirName(tmpdir.getPath());
-        bookieConf.setLedgerDirNames(new String[]{tmpdir.getPath()});
-
-        BookieServer b = new BookieServer(bookieConf);
-        b.start();
-        for (int i = 0; i < 10 && !b.isRunning(); i++) {
-            Thread.sleep(10000);
-        }
-        if (!b.isRunning()) {
-            throw new IOException("Bookie would not start");
-        }
-        return b;
-    }
-
-    /**
-     * Check that a number of bookies are available
-     *
-     * @param count number of bookies required
-     * @param timeout number of seconds to wait for bookies to start
-     * @throws java.io.IOException if bookies are not started by the time the timeout hits
-     */
-    public int checkBookiesUp(int count, int timeout) throws Exception {
-        ZooKeeper zkc = connectZooKeeper(zkHost, zkPort, zkTimeoutSec);
-        try {
-            int mostRecentSize = 0;
-            for (int i = 0; i < timeout; i++) {
-                try {
-                    List<String> children = zkc.getChildren("/ledgers/available",
-                        false);
-                    children.remove("readonly");
-                    mostRecentSize = children.size();
-                    if ((mostRecentSize > count) || LOG.isDebugEnabled()) {
-                        LOG.info("Found " + mostRecentSize + " bookies up, "
-                            + "waiting for " + count);
-                        if ((mostRecentSize > count) || LOG.isTraceEnabled()) {
-                            for (String child : children) {
-                                LOG.info(" server: " + child);
-                            }
-                        }
-                    }
-                    if (mostRecentSize == count) {
-                        break;
-                    }
-                } catch (KeeperException e) {
-                    // ignore
-                }
-                Thread.sleep(1000);
-            }
-            return mostRecentSize;
-        } finally {
-            zkc.close();
-        }
-    }
-
-    public static String getBkLedgerPath() {
-        return "/ledgers";
-    }
-
-    public static ZooKeeper connectZooKeeper(String zkHost, int zkPort)
-        throws IOException, KeeperException, InterruptedException {
-            return connectZooKeeper(zkHost, zkPort, DEFAULT_ZK_TIMEOUT_SEC);
-    }
-
-    public static ZooKeeper connectZooKeeper(String zkHost, int zkPort, int zkTimeoutSec)
-        throws IOException, KeeperException, InterruptedException {
-        final CountDownLatch latch = new CountDownLatch(1);
-        final String zkHostPort = zkHost + ":" + zkPort;
-
-        ZooKeeper zkc = new ZooKeeper(zkHostPort, zkTimeoutSec * 1000, new Watcher() {
-            public void process(WatchedEvent event) {
-                if (event.getState() == Event.KeeperState.SyncConnected) {
-                    latch.countDown();
-                }
-            }
-        });
-        if (!latch.await(zkTimeoutSec, TimeUnit.SECONDS)) {
-            throw new IOException("Zookeeper took too long to connect");
-        }
-        return zkc;
-    }
-
-    public static URI createDLMURI(String path) throws Exception {
-        return createDLMURI(DEFAULT_ZK_ENSEMBLE, path);
-    }
-
-    public static URI createDLMURI(String zkServers, String path) throws Exception {
-        return URI.create("distributedlog://" + zkServers + DLOG_NAMESPACE + path);
-    }
-
-    /**
-     * Try to start zookkeeper locally on any port.
-     */
-    public static Pair<ZooKeeperServerShim, Integer> runZookeeperOnAnyPort(File zkDir) throws Exception {
-        return runZookeeperOnAnyPort((int) (Math.random()*10000+7000), zkDir);
-    }
-
-    /**
-     * Try to start zookkeeper locally on any port beginning with some base port.
-     * Dump some socket info when bind fails.
-     */
-    public static Pair<ZooKeeperServerShim, Integer> runZookeeperOnAnyPort(int basePort, File zkDir) throws Exception {
-
-        final int MAX_RETRIES = 20;
-        final int MIN_PORT = 1025;
-        final int MAX_PORT = 65535;
-        ZooKeeperServerShim zks = null;
-        int zkPort = basePort;
-        boolean success = false;
-        int retries = 0;
-
-        while (!success) {
-            try {
-                LOG.info("zk trying to bind to port " + zkPort);
-                zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkDir);
-                success = true;
-            } catch (BindException be) {
-                retries++;
-                if (retries > MAX_RETRIES) {
-                    throw be;
-                }
-                zkPort++;
-                if (zkPort > MAX_PORT) {
-                    zkPort = MIN_PORT;
-                }
-            }
-        }
-
-        return Pair.of(zks, zkPort);
-    }
-
-    public static void main(String[] args) throws Exception {
-        try {
-            if (args.length < 1) {
-                System.out.println("Usage: LocalDLEmulator <zk_port>");
-                System.exit(-1);
-            }
-
-            final int zkPort = Integer.parseInt(args[0]);
-            final File zkDir = IOUtils.createTempDir("distrlog", "zookeeper");
-            final LocalDLMEmulator localDlm = LocalDLMEmulator.newBuilder()
-                .zkPort(zkPort)
-                .build();
-
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-                @Override
-                public void run() {
-                    try {
-                        localDlm.teardown();
-                        FileUtils.deleteDirectory(zkDir);
-                        System.out.println("ByeBye!");
-                    } catch (Exception e) {
-                        // do nothing
-                    }
-                }
-            });
-            localDlm.start();
-
-            System.out.println(String.format(
-                "DistributedLog Sandbox is running now. You could access distributedlog://%s:%s",
-                DEFAULT_ZK_HOST,
-                zkPort));
-        } catch (Exception ex) {
-            System.out.println("Exception occurred running emulator " + ex);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/LogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LogReader.java
deleted file mode 100644
index c12de29..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogReader.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.io.AsyncCloseable;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * <i>LogReader</i> is a `synchronous` reader reading records from a DL log.
- *
- * <h3>Lifecycle of a Reader</h3>
- *
- * A reader is a <i>sequential</i> reader that read records from a DL log starting
- * from a given position. The position could be a <i>DLSN</i> (via {@link DistributedLogManager#getInputStream(DLSN)}
- * or a <i>Transaction ID</i> (via {@link DistributedLogManager#getInputStream(long)}.
- * <p>
- * After the reader is open, it could call {@link #readNext(boolean)} or {@link #readBulk(boolean, int)}
- * to read records out the log from provided position.
- * <p>
- * Closing the reader (via {@link #close()} will release all the resources occupied
- * by this reader instance.
- * <p>
- * Exceptions could be thrown during reading records. Once the exception is thrown,
- * the reader is set to an error state and it isn't usable anymore. It is the application's
- * responsibility to handle the exceptions and re-create readers if necessary.
- * <p>
- * Example:
- * <pre>
- * DistributedLogManager dlm = ...;
- * long nextTxId = ...;
- * LogReader reader = dlm.getInputStream(nextTxId);
- *
- * while (true) { // keep reading & processing records
- *     LogRecord record;
- *     try {
- *         record = reader.readNext(false);
- *         nextTxId = record.getTransactionId();
- *         // process the record
- *         ...
- *     } catch (IOException ioe) {
- *         // handle the exception
- *         ...
- *         reader = dlm.getInputStream(nextTxId + 1);
- *     }
- * }
- *
- * </pre>
- *
- * <h3>Read Records</h3>
- *
- * Reading records from an <i>endless</i> log in `synchronous` way isn't as
- * trivial as in `asynchronous` way (via {@link AsyncLogReader}. Because it
- * lacks of callback mechanism. LogReader introduces a flag `nonBlocking` on
- * controlling the <i>waiting</i> behavior on `synchronous` reads.
- *
- * <h4>Blocking vs NonBlocking</h4>
- *
- * <i>Blocking</i> (nonBlocking = false) means the reads will wait for records
- * before returning read calls. While <i>NonBlocking</i> (nonBlocking = true)
- * means the reads will only check readahead cache and return whatever records
- * available in the readahead cache.
- * <p>
- * The <i>waiting</i> period varies in <i>blocking</i> mode. If the reader is
- * catching up with writer (there are records in the log), the read call will
- * wait until records are read and returned. If the reader is caught up with
- * writer (there are no more records in the log at read time), the read call
- * will wait for a small period of time (defined in
- * {@link DistributedLogConfiguration#getReadAheadWaitTime()} and return whatever
- * records available in the readahead cache. In other words, if a reader sees
- * no record on blocking reads, it means the reader is `caught-up` with the
- * writer.
- * <p>
- * <i>Blocking</i> and <i>NonBlocking</i> modes are useful for building replicated
- * state machines. Applications could use <i>blocking</i> reads till caught up
- * with latest data. Once they are caught up with latest data, they could start
- * serving their service and turn to <i>non-blocking</i> read mode and tail read
- * data from the logs.
- * <p>
- * See examples below.
- *
- * <h4>Read Single Record</h4>
- *
- * {@link #readNext(boolean)} is reading individual records from a DL log.
- *
- * <pre>
- * LogReader reader = ...
- *
- * // keep reading records in blocking way until no records available in the log
- * LogRecord record = reader.readNext(false);
- * while (null != record) {
- *     // process the record
- *     ...
- *     // read next record
- *     records = reader.readNext(false);
- * }
- *
- * ...
- *
- * // reader is caught up with writer, doing non-blocking reads to tail the log
- * while (true) {
- *     record = reader.readNext(true)
- *     // process the new records
- *     ...
- * }
- * </pre>
- *
- * <h4>Read Batch of Records</h4>
- *
- * {@link #readBulk(boolean, int)} is a convenient way to read a batch of records
- * from a DL log.
- *
- * <pre>
- * LogReader reader = ...
- * int N = 10;
- *
- * // keep reading N records in blocking way until no records available in the log
- * List<LogRecord> records = reader.readBulk(false, N);
- * while (!records.isEmpty()) {
- *     // process the list of records
- *     ...
- *     if (records.size() < N) { // no more records available in the log
- *         break;
- *     }
- *     // read next N records
- *     records = reader.readBulk(false, N);
- * }
- *
- * ...
- *
- * // reader is caught up with writer, doing non-blocking reads to tail the log
- * while (true) {
- *     records = reader.readBulk(true, N)
- *     // process the new records
- *     ...
- * }
- *
- * </pre>
- *
- * @see AsyncLogReader
- *
- * NOTE:
- * 1. Extending {@link AsyncCloseable}: BKSyncLogReader is implemented based on BKAsyncLogReader, exposing
- *    the {@link AsyncCloseable} interface so the reader could be closed asynchronously
- */
-public interface LogReader extends Closeable, AsyncCloseable {
-
-    /**
-     * Read the next log record from the stream.
-     * <p>
-     * If <i>nonBlocking</i> is set to true, the call returns immediately by just polling
-     * records from read ahead cache. It would return <i>null</i> if there isn't any records
-     * available in the read ahead cache.
-     * <p>
-     * If <i>nonBlocking</i> is set to false, it would does blocking call. The call will
-     * block until return a record if there are records in the stream (aka catching up).
-     * Otherwise it would wait up to {@link DistributedLogConfiguration#getReadAheadWaitTime()}
-     * milliseconds and return null if there isn't any more records in the stream.
-     *
-     * @param nonBlocking should the read make blocking calls to the backend or rely on the
-     * readAhead cache
-     * @return an operation from the stream or null if at end of stream
-     * @throws IOException if there is an error reading from the stream
-     */
-    public LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException;
-
-    /**
-     * Read the next <i>numLogRecords</i> log records from the stream
-     *
-     * @param nonBlocking should the read make blocking calls to the backend or rely on the
-     * readAhead cache
-     * @param numLogRecords maximum number of log records returned by this call.
-     * @return an operation from the stream or empty list if at end of stream
-     * @throws IOException if there is an error reading from the stream
-     * @see #readNext(boolean)
-     */
-    public List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) throws IOException;
-}