You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:10 UTC

[05/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java
new file mode 100644
index 0000000..f951991
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+public class DistributedLogConstants {
+    public static final byte[] EMPTY_BYTES = new byte[0];
+    public static final String SCHEME_PREFIX = "distributedlog";
+    public static final String BACKEND_BK = "bk";
+    public static final long INVALID_TXID = -999;
+    public static final long EMPTY_LOGSEGMENT_TX_ID = -99;
+    public static final long MAX_TXID = Long.MAX_VALUE;
+    public static final long SMALL_LOGSEGMENT_THRESHOLD = 10;
+    public static final int LOGSEGMENT_NAME_VERSION = 1;
+    public static final int FUTURE_TIMEOUT_IMMEDIATE = 0;
+    public static final int FUTURE_TIMEOUT_INFINITE = -1;
+    public static final long LOCK_IMMEDIATE = FUTURE_TIMEOUT_IMMEDIATE;
+    public static final long LOCK_TIMEOUT_INFINITE = FUTURE_TIMEOUT_INFINITE;
+    public static final long LOCK_OP_TIMEOUT_DEFAULT = 120;
+    public static final long LOCK_REACQUIRE_TIMEOUT_DEFAULT = 120;
+    public static final String UNKNOWN_CLIENT_ID = "Unknown-ClientId";
+    public static final int LOCAL_REGION_ID = 0;
+    public static final long LOGSEGMENT_DEFAULT_STATUS = 0;
+    public static final long UNASSIGNED_LOGSEGMENT_SEQNO = 0;
+    public static final long UNASSIGNED_SEQUENCE_ID = -1L;
+    public static final long FIRST_LOGSEGMENT_SEQNO = 1;
+    public static final long UNRESOLVED_LEDGER_ID = -1;
+    public static final long LATENCY_WARN_THRESHOLD_IN_MILLIS = TimeUnit.SECONDS.toMillis(1);
+    public static final int DL_INTERRUPTED_EXCEPTION_RESULT_CODE = Integer.MIN_VALUE + 1;
+    public static final int ZK_CONNECTION_EXCEPTION_RESULT_CODE = Integer.MIN_VALUE + 2;
+
+    public static final String ALLOCATION_POOL_NODE = ".allocation_pool";
+    // log segment prefix
+    public static final String INPROGRESS_LOGSEGMENT_PREFIX = "inprogress";
+    public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs";
+    public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement";
+    static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
+    static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8);
+
+    // An ACL that gives all permissions to node creators and read permissions only to everyone else.
+    public static final List<ACL> EVERYONE_READ_CREATOR_ALL =
+        ImmutableList.<ACL>builder()
+            .addAll(Ids.CREATOR_ALL_ACL)
+            .addAll(Ids.READ_ACL_UNSAFE)
+            .build();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java
new file mode 100644
index 0000000..7d33e9c
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.io.AsyncCloseable;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.subscription.SubscriptionStateStore;
+import org.apache.distributedlog.subscription.SubscriptionsStore;
+import com.twitter.util.Future;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A DistributedLogManager is responsible for managing a single place of storing
+ * edit logs. It may correspond to multiple files, a backup node, etc.
+ * Even when the actual underlying storage is rolled, or failed and restored,
+ * each conceptual place of storage corresponds to exactly one instance of
+ * this class, which is created when the EditLog is first opened.
+ */
+public interface DistributedLogManager extends AsyncCloseable, Closeable {
+
+    /**
+     * Get the name of the stream managed by this log manager
+     * @return streamName
+     */
+    public String getStreamName();
+
+    /**
+     * Get the namespace driver used by this manager.
+     *
+     * @return the namespace driver
+     */
+    public NamespaceDriver getNamespaceDriver();
+
+    /**
+     * Get log segments.
+     *
+     * @return log segments
+     * @throws IOException
+     */
+    public List<LogSegmentMetadata> getLogSegments() throws IOException;
+
+    /**
+     * Register <i>listener</i> on log segment updates of this stream.
+     *
+     * @param listener
+     *          listener to receive update log segment list.
+     */
+    public void registerListener(LogSegmentListener listener) throws IOException ;
+
+    /**
+     * Unregister <i>listener</i> on log segment updates from this stream.
+     *
+     * @param listener
+     *          listener to receive update log segment list.
+     */
+    public void unregisterListener(LogSegmentListener listener);
+
+    /**
+     * Open async log writer to write records to the log stream.
+     *
+     * @return result represents the open result
+     */
+    public Future<AsyncLogWriter> openAsyncLogWriter();
+
+    /**
+     * Begin writing to the log stream identified by the name
+     *
+     * @return the writer interface to generate log records
+     */
+    public LogWriter startLogSegmentNonPartitioned() throws IOException;
+
+    /**
+     * Begin writing to the log stream identified by the name
+     *
+     * @return the writer interface to generate log records
+     */
+    // @Deprecated
+    public AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException;
+
+    /**
+     * Begin appending to the end of the log stream which is being treated as a sequence of bytes
+     *
+     * @return the writer interface to generate log records
+     */
+    public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException;
+
+    /**
+     * Get a reader to read a log stream as a sequence of bytes
+     *
+     * @return the writer interface to generate log records
+     */
+    public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException;
+
+    /**
+     * Get the input stream starting with fromTxnId for the specified log
+     *
+     * @param fromTxnId - the first transaction id we want to read
+     * @return the stream starting with transaction fromTxnId
+     * @throws IOException if a stream cannot be found.
+     */
+    public LogReader getInputStream(long fromTxnId)
+        throws IOException;
+
+    public LogReader getInputStream(DLSN fromDLSN) throws IOException;
+
+    /**
+     * Open an async log reader to read records from a log starting from <code>fromTxnId</code>.
+     *
+     * @param fromTxnId
+     *          transaction id to start reading from
+     * @return async log reader
+     */
+    public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId);
+
+    /**
+     * Open an async log reader to read records from a log starting from <code>fromDLSN</code>
+     *
+     * @param fromDLSN
+     *          dlsn to start reading from
+     * @return async log reader
+     */
+    public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN);
+
+    // @Deprecated
+    public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException;
+
+    // @Deprecated
+    public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException;
+
+    public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN);
+
+    /**
+     * Get a log reader with lock starting from <i>fromDLSN</i> and using <i>subscriberId</i>.
+     * If two readers tried to open using same subscriberId, one would succeed, while the other
+     * will be blocked until it gets the lock.
+     *
+     * @param fromDLSN
+     *          start dlsn
+     * @param subscriberId
+     *          subscriber id
+     * @return async log reader
+     */
+    public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId);
+
+    /**
+     * Get a log reader using <i>subscriberId</i> with lock. The reader will start reading from
+     * its last commit position recorded in subscription store. If no last commit position found
+     * in subscription store, it would start reading from head of the stream.
+     *
+     * If the two readers tried to open using same subscriberId, one would succeed, while the other
+     * will be blocked until it gets the lock.
+     *
+     * @param subscriberId
+     *          subscriber id
+     * @return async log reader
+     */
+    public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId);
+
+    /**
+     * Get the {@link DLSN} of first log record whose transaction id is not less than <code>transactionId</code>.
+     *
+     * @param transactionId
+     *          transaction id
+     * @return dlsn of first log record whose transaction id is not less than transactionId.
+     */
+    public Future<DLSN> getDLSNNotLessThanTxId(long transactionId);
+
+    /**
+     * Get the last log record in the stream
+     *
+     * @return the last log record in the stream
+     * @throws IOException if a stream cannot be found.
+     */
+    public LogRecordWithDLSN getLastLogRecord()
+        throws IOException;
+
+    /**
+     * Get the earliest Transaction Id available in the log
+     *
+     * @return earliest transaction id
+     * @throws IOException
+     */
+    public long getFirstTxId() throws IOException;
+
+    /**
+     * Get Latest Transaction Id in the log
+     *
+     * @return latest transaction id
+     * @throws IOException
+     */
+    public long getLastTxId() throws IOException;
+
+    /**
+     * Get Latest DLSN in the log
+     *
+     * @return last dlsn
+     * @throws IOException
+     */
+    public DLSN getLastDLSN() throws IOException;
+
+    /**
+     * Get Latest log record with DLSN in the log - async
+     *
+     * @return latest log record with DLSN
+     */
+    public Future<LogRecordWithDLSN> getLastLogRecordAsync();
+
+    /**
+     * Get Latest Transaction Id in the log - async
+     *
+     * @return latest transaction id
+     */
+    public Future<Long> getLastTxIdAsync();
+
+    /**
+     * Get first DLSN in the log.
+     *
+     * @return first dlsn in the stream
+     */
+    public Future<DLSN> getFirstDLSNAsync();
+
+    /**
+     * Get Latest DLSN in the log - async
+     *
+     * @return latest transaction id
+     */
+    public Future<DLSN> getLastDLSNAsync();
+
+    /**
+     * Get the number of log records in the active portion of the log
+     * Any log segments that have already been truncated will not be included
+     *
+     * @return number of log records
+     * @throws IOException
+     */
+    public long getLogRecordCount() throws IOException;
+
+    /**
+     * Get the number of log records in the active portion of the log - async.
+     * Any log segments that have already been truncated will not be included
+     *
+     * @return future number of log records
+     * @throws IOException
+     */
+    public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN);
+
+    /**
+     * Run recovery on the log.
+     *
+     * @throws IOException
+     */
+    public void recover() throws IOException;
+
+    /**
+     * Check if an end of stream marker was added to the stream
+     * A stream with an end of stream marker cannot be appended to
+     *
+     * @return true if the marker was added to the stream, false otherwise
+     * @throws IOException
+     */
+    public boolean isEndOfStreamMarked() throws IOException;
+
+    /**
+     * Delete the log.
+     *
+     * @throws IOException if the deletion fails
+     */
+    public void delete() throws IOException;
+
+    /**
+     * The DistributedLogManager may archive/purge any logs for transactionId
+     * less than or equal to minImageTxId.
+     * This is to be used only when the client explicitly manages deletion. If
+     * the cleanup policy is based on sliding time window, then this method need
+     * not be called.
+     *
+     * @param minTxIdToKeep the earliest txid that must be retained
+     * @throws IOException if purging fails
+     */
+    public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
+
+    /**
+     * Get the subscriptions store provided by the distributedlog manager.
+     *
+     * @return subscriptions store manages subscriptions for current stream.
+     */
+    public SubscriptionsStore getSubscriptionsStore();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
new file mode 100644
index 0000000..617282c
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.distributedlog.exceptions.LogRecordTooLongException;
+import org.apache.distributedlog.exceptions.WriteException;
+import org.apache.distributedlog.io.CompressionCodec;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A set of {@link LogRecord}s.
+ */
+public class Entry {
+
+    /**
+     * Create a new log record set.
+     *
+     * @param logName
+     *          name of the log
+     * @param initialBufferSize
+     *          initial buffer size
+     * @param envelopeBeforeTransmit
+     *          if envelope the buffer before transmit
+     * @param codec
+     *          compression codec
+     * @param statsLogger
+     *          stats logger to receive stats
+     * @return writer to build a log record set.
+     */
+    public static Writer newEntry(
+            String logName,
+            int initialBufferSize,
+            boolean envelopeBeforeTransmit,
+            CompressionCodec.Type codec,
+            StatsLogger statsLogger) {
+        return new EnvelopedEntryWriter(
+                logName,
+                initialBufferSize,
+                envelopeBeforeTransmit,
+                codec,
+                statsLogger);
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Build the record set object.
+     */
+    public static class Builder {
+
+        private long logSegmentSequenceNumber = -1;
+        private long entryId = -1;
+        private long startSequenceId = Long.MIN_VALUE;
+        private boolean envelopeEntry = true;
+        // input stream
+        private InputStream in = null;
+        // or bytes array
+        private byte[] data = null;
+        private int offset = -1;
+        private int length = -1;
+        private Optional<Long> txidToSkipTo = Optional.absent();
+        private Optional<DLSN> dlsnToSkipTo = Optional.absent();
+        private boolean deserializeRecordSet = true;
+
+        private Builder() {}
+
+        /**
+         * Reset the builder.
+         *
+         * @return builder
+         */
+        public Builder reset() {
+            logSegmentSequenceNumber = -1;
+            entryId = -1;
+            startSequenceId = Long.MIN_VALUE;
+            envelopeEntry = true;
+            // input stream
+            in = null;
+            // or bytes array
+            data = null;
+            offset = -1;
+            length = -1;
+            txidToSkipTo = Optional.absent();
+            dlsnToSkipTo = Optional.absent();
+            return this;
+        }
+
+        /**
+         * Set the segment info of the log segment that this record
+         * set belongs to.
+         *
+         * @param lssn
+         *          log segment sequence number
+         * @param startSequenceId
+         *          start sequence id of this log segment
+         * @return builder
+         */
+        public Builder setLogSegmentInfo(long lssn, long startSequenceId) {
+            this.logSegmentSequenceNumber = lssn;
+            this.startSequenceId = startSequenceId;
+            return this;
+        }
+
+        /**
+         * Set the entry id of this log record set.
+         *
+         * @param entryId
+         *          entry id assigned for this log record set.
+         * @return builder
+         */
+        public Builder setEntryId(long entryId) {
+            this.entryId = entryId;
+            return this;
+        }
+
+        /**
+         * Set whether this record set is enveloped or not.
+         *
+         * @param enabled
+         *          flag indicates whether this record set is enveloped or not.
+         * @return builder
+         */
+        public Builder setEnvelopeEntry(boolean enabled) {
+            this.envelopeEntry = enabled;
+            return this;
+        }
+
+        /**
+         * Set the serialized bytes data of this record set.
+         *
+         * @param data
+         *          serialized bytes data of this record set.
+         * @param offset
+         *          offset of the bytes data
+         * @param length
+         *          length of the bytes data
+         * @return builder
+         */
+        public Builder setData(byte[] data, int offset, int length) {
+            this.data = data;
+            this.offset = offset;
+            this.length = length;
+            return this;
+        }
+
+        /**
+         * Set the input stream of the serialized bytes data of this record set.
+         *
+         * @param in
+         *          input stream
+         * @return builder
+         */
+        public Builder setInputStream(InputStream in) {
+            this.in = in;
+            return this;
+        }
+
+        /**
+         * Set the record set starts from <code>dlsn</code>.
+         *
+         * @param dlsn
+         *          dlsn to skip to
+         * @return builder
+         */
+        public Builder skipTo(@Nullable DLSN dlsn) {
+            this.dlsnToSkipTo = Optional.fromNullable(dlsn);
+            return this;
+        }
+
+        /**
+         * Set the record set starts from <code>txid</code>.
+         *
+         * @param txid
+         *          txid to skip to
+         * @return builder
+         */
+        public Builder skipTo(long txid) {
+            this.txidToSkipTo = Optional.of(txid);
+            return this;
+        }
+
+        /**
+         * Enable/disable deserialize record set.
+         *
+         * @param enabled
+         *          flag to enable/disable dserialize record set.
+         * @return builder
+         */
+        public Builder deserializeRecordSet(boolean enabled) {
+            this.deserializeRecordSet = enabled;
+            return this;
+        }
+
+        public Entry build() {
+            Preconditions.checkNotNull(data, "Serialized data isn't provided");
+            Preconditions.checkArgument(offset >= 0 && length >= 0
+                    && (offset + length) <= data.length,
+                    "Invalid offset or length of serialized data");
+            return new Entry(
+                    logSegmentSequenceNumber,
+                    entryId,
+                    startSequenceId,
+                    envelopeEntry,
+                    deserializeRecordSet,
+                    data,
+                    offset,
+                    length,
+                    txidToSkipTo,
+                    dlsnToSkipTo);
+        }
+
+        public Entry.Reader buildReader() throws IOException {
+            Preconditions.checkArgument(data != null || in != null,
+                    "Serialized data or input stream isn't provided");
+            InputStream in;
+            if (null != this.in) {
+                in = this.in;
+            } else {
+                Preconditions.checkArgument(offset >= 0 && length >= 0
+                                && (offset + length) <= data.length,
+                        "Invalid offset or length of serialized data");
+                in = new ByteArrayInputStream(data, offset, length);
+            }
+            return new EnvelopedEntryReader(
+                    logSegmentSequenceNumber,
+                    entryId,
+                    startSequenceId,
+                    in,
+                    envelopeEntry,
+                    deserializeRecordSet,
+                    NullStatsLogger.INSTANCE);
+        }
+
+    }
+
+    private final long logSegmentSequenceNumber;
+    private final long entryId;
+    private final long startSequenceId;
+    private final boolean envelopedEntry;
+    private final boolean deserializeRecordSet;
+    private final byte[] data;
+    private final int offset;
+    private final int length;
+    private final Optional<Long> txidToSkipTo;
+    private final Optional<DLSN> dlsnToSkipTo;
+
+    private Entry(long logSegmentSequenceNumber,
+                  long entryId,
+                  long startSequenceId,
+                  boolean envelopedEntry,
+                  boolean deserializeRecordSet,
+                  byte[] data,
+                  int offset,
+                  int length,
+                  Optional<Long> txidToSkipTo,
+                  Optional<DLSN> dlsnToSkipTo) {
+        this.logSegmentSequenceNumber = logSegmentSequenceNumber;
+        this.entryId = entryId;
+        this.startSequenceId = startSequenceId;
+        this.envelopedEntry = envelopedEntry;
+        this.deserializeRecordSet = deserializeRecordSet;
+        this.data = data;
+        this.offset = offset;
+        this.length = length;
+        this.txidToSkipTo = txidToSkipTo;
+        this.dlsnToSkipTo = dlsnToSkipTo;
+    }
+
+    /**
+     * Get raw data of this record set.
+     *
+     * @return raw data representation of this record set.
+     */
+    public byte[] getRawData() {
+        return data;
+    }
+
+    /**
+     * Create reader to iterate over this record set.
+     *
+     * @return reader to iterate over this record set.
+     * @throws IOException if the record set is invalid record set.
+     */
+    public Reader reader() throws IOException {
+        InputStream in = new ByteArrayInputStream(data, offset, length);
+        Reader reader = new EnvelopedEntryReader(
+                logSegmentSequenceNumber,
+                entryId,
+                startSequenceId,
+                in,
+                envelopedEntry,
+                deserializeRecordSet,
+                NullStatsLogger.INSTANCE);
+        if (txidToSkipTo.isPresent()) {
+            reader.skipTo(txidToSkipTo.get());
+        }
+        if (dlsnToSkipTo.isPresent()) {
+            reader.skipTo(dlsnToSkipTo.get());
+        }
+        return reader;
+    }
+
+    /**
+     * Writer to append {@link LogRecord}s to {@link Entry}.
+     */
+    public interface Writer extends EntryBuffer {
+
+        /**
+         * Write a {@link LogRecord} to this record set.
+         *
+         * @param record
+         *          record to write
+         * @param transmitPromise
+         *          callback for transmit result. the promise is only
+         *          satisfied when this record set is transmitted.
+         * @throws LogRecordTooLongException if the record is too long
+         * @throws WriteException when encountered exception writing the record
+         */
+        void writeRecord(LogRecord record, Promise<DLSN> transmitPromise)
+                throws LogRecordTooLongException, WriteException;
+
+        /**
+         * Reset the writer to write records.
+         */
+        void reset();
+
+    }
+
+    /**
+     * Reader to read {@link LogRecord}s from this record set.
+     */
+    public interface Reader {
+
+        /**
+         * Get the log segment sequence number.
+         *
+         * @return the log segment sequence number.
+         */
+        long getLSSN();
+
+        /**
+         * Return the entry id.
+         *
+         * @return the entry id.
+         */
+        long getEntryId();
+
+        /**
+         * Read next log record from this record set.
+         *
+         * @return next log record from this record set.
+         */
+        LogRecordWithDLSN nextRecord() throws IOException;
+
+        /**
+         * Skip the reader to the record whose transaction id is <code>txId</code>.
+         *
+         * @param txId
+         *          transaction id to skip to.
+         * @return true if skip succeeds, otherwise false.
+         * @throws IOException
+         */
+        boolean skipTo(long txId) throws IOException;
+
+        /**
+         * Skip the reader to the record whose DLSN is <code>dlsn</code>.
+         *
+         * @param dlsn
+         *          DLSN to skip to.
+         * @return true if skip succeeds, otherwise false.
+         * @throws IOException
+         */
+        boolean skipTo(DLSN dlsn) throws IOException;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java
new file mode 100644
index 0000000..c695420
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
+import org.apache.distributedlog.io.Buffer;
+import org.apache.distributedlog.io.TransmitListener;
+
+import java.io.IOException;
+
+/**
+ * Write representation of a {@link Entry}.
+ * It is a buffer of log record set, used for transmission.
+ */
+public interface EntryBuffer extends TransmitListener {
+
+    /**
+     * Return if this record set contains user records.
+     *
+     * @return true if this record set contains user records, otherwise
+     * return false.
+     */
+    boolean hasUserRecords();
+
+    /**
+     * Return number of records in current record set.
+     *
+     * @return number of records in current record set.
+     */
+    int getNumRecords();
+
+    /**
+     * Return number of bytes in current record set.
+     *
+     * @return number of bytes in current record set.
+     */
+    int getNumBytes();
+
+    /**
+     * Return max tx id in current record set.
+     *
+     * @return max tx id.
+     */
+    long getMaxTxId();
+
+    /**
+     * Get the buffer to transmit.
+     *
+     * @return the buffer to transmit.
+     * @throws InvalidEnvelopedEntryException if the record set buffer is invalid
+     * @throws IOException when encountered IOException during serialization
+     */
+    Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/EntryPosition.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EntryPosition.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryPosition.java
new file mode 100644
index 0000000..218662c
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryPosition.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+/**
+ * The position of an entry, identified by log segment sequence number and entry id.
+ */
+class EntryPosition {
+
+    private long lssn;
+    private long entryId;
+
+    EntryPosition(long lssn, long entryId) {
+        this.lssn = lssn;
+        this.entryId = entryId;
+    }
+
+    public synchronized long getLogSegmentSequenceNumber() {
+        return lssn;
+    }
+
+    public synchronized long getEntryId() {
+        return entryId;
+    }
+
+    public synchronized boolean advance(long lssn, long entryId) {
+        if (lssn == this.lssn) {
+            if (entryId <= this.entryId) {
+                return false;
+            }
+            this.entryId = entryId;
+            return true;
+        } else if (lssn > this.lssn) {
+            this.lssn = lssn;
+            this.entryId = entryId;
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("(").append(lssn).append(", ").append(entryId).append(")");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
new file mode 100644
index 0000000..eb1e9af
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+import org.apache.distributedlog.annotations.DistributedLogAnnotations.Compression;
+import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.io.CompressionUtils;
+import org.apache.distributedlog.util.BitMaskUtils;
+
+/**
+ * An enveloped entry written to BookKeeper.
+ *
+ * Data type in brackets. Interpretation should be on the basis of data types and not individual
+ * bytes to honor Endianness.
+ *
+ * Entry Structure:
+ * ---------------
+ * Bytes 0                                  : Version (Byte)
+ * Bytes 1 - (DATA = 1+Header.length-1)     : Header (Integer)
+ * Bytes DATA - DATA+3                      : Payload Length (Integer)
+ * BYTES DATA+4 - DATA+4+payload.length-1   : Payload (Byte[])
+ *
+ * V1 Header Structure: // Offsets relative to the start of the header.
+ * -------------------
+ * Bytes 0 - 3                              : Flags (Integer)
+ * Bytes 4 - 7                              : Original payload size before compression (Integer)
+ *
+ *      Flags: // 32 Bits
+ *      -----
+ *      0 ... 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
+ *                                      |_|
+ *                                       |
+ *                               Compression Type
+ *
+ *      Compression Type: // 2 Bits (Least significant)
+ *      ----------------
+ *      00      : No Compression
+ *      01      : LZ4 Compression
+ *      10      : Unused
+ *      11      : Unused
+ */
+public class EnvelopedEntry {
+
+    public static final int VERSION_LENGTH = 1; // One byte long
+    public static final byte VERSION_ONE = 1;
+
+    public static final byte LOWEST_SUPPORTED_VERSION = VERSION_ONE;
+    public static final byte HIGHEST_SUPPORTED_VERSION = VERSION_ONE;
+    public static final byte CURRENT_VERSION = VERSION_ONE;
+
+    private final OpStatsLogger compressionStat;
+    private final OpStatsLogger decompressionStat;
+    private final Counter compressedEntryBytes;
+    private final Counter decompressedEntryBytes;
+    private final byte version;
+
+    private Header header = new Header();
+    private Payload payloadCompressed = new Payload();
+    private Payload payloadDecompressed = new Payload();
+
+    public EnvelopedEntry(byte version,
+                          StatsLogger statsLogger) throws InvalidEnvelopedEntryException {
+        Preconditions.checkNotNull(statsLogger);
+        if (version < LOWEST_SUPPORTED_VERSION || version > HIGHEST_SUPPORTED_VERSION) {
+            throw new InvalidEnvelopedEntryException("Invalid enveloped entry version " + version + ", expected to be in [ "
+                    + LOWEST_SUPPORTED_VERSION + " ~ " + HIGHEST_SUPPORTED_VERSION + " ]");
+        }
+        this.version = version;
+        this.compressionStat = statsLogger.getOpStatsLogger("compression_time");
+        this.decompressionStat = statsLogger.getOpStatsLogger("decompression_time");
+        this.compressedEntryBytes = statsLogger.getCounter("compressed_bytes");
+        this.decompressedEntryBytes = statsLogger.getCounter("decompressed_bytes");
+    }
+
+    /**
+     * @param statsLogger
+     *          Used for getting stats for (de)compression time
+     * @param compressionType
+     *          The compression type to use
+     * @param decompressed
+     *          The decompressed payload
+     *          NOTE: The size of the byte array passed as the decompressed payload can be larger
+     *                than the actual contents to be compressed.
+     */
+    public EnvelopedEntry(byte version,
+                          CompressionCodec.Type compressionType,
+                          byte[] decompressed,
+                          int length,
+                          StatsLogger statsLogger)
+            throws InvalidEnvelopedEntryException {
+        this(version, statsLogger);
+        Preconditions.checkNotNull(compressionType);
+        Preconditions.checkNotNull(decompressed);
+        Preconditions.checkArgument(length >= 0, "Invalid bytes length " + length);
+
+        this.header = new Header(compressionType, length);
+        this.payloadDecompressed = new Payload(length, decompressed);
+    }
+
+    private boolean isReady() {
+        return (header.ready && payloadDecompressed.ready);
+    }
+
+    @Compression
+    public void writeFully(DataOutputStream out) throws IOException {
+        Preconditions.checkNotNull(out);
+        if (!isReady()) {
+            throw new IOException("Entry not writable");
+        }
+        // Version
+        out.writeByte(version);
+        // Header
+        header.write(out);
+        // Compress
+        CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType);
+        byte[] compressed = codec.compress(
+                payloadDecompressed.payload,
+                0,
+                payloadDecompressed.length,
+                compressionStat);
+        this.payloadCompressed = new Payload(compressed.length, compressed);
+        this.compressedEntryBytes.add(payloadCompressed.length);
+        this.decompressedEntryBytes.add(payloadDecompressed.length);
+        payloadCompressed.write(out);
+    }
+
+    @Compression
+    public void readFully(DataInputStream in) throws IOException {
+        Preconditions.checkNotNull(in);
+        // Make sure we're reading the right versioned entry.
+        byte version = in.readByte();
+        if (version != this.version) {
+            throw new IOException(String.format("Version mismatch while reading. Received: %d," +
+                    " Required: %d", version, this.version));
+        }
+        header.read(in);
+        payloadCompressed.read(in);
+        // Decompress
+        CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType);
+        byte[] decompressed = codec.decompress(
+                payloadCompressed.payload,
+                0,
+                payloadCompressed.length,
+                header.decompressedSize,
+                decompressionStat);
+        this.payloadDecompressed = new Payload(decompressed.length, decompressed);
+        this.compressedEntryBytes.add(payloadCompressed.length);
+        this.decompressedEntryBytes.add(payloadDecompressed.length);
+    }
+
+    public byte[] getDecompressedPayload() throws IOException {
+        if (!isReady()) {
+            throw new IOException("Decompressed payload is not initialized");
+        }
+        return payloadDecompressed.payload;
+    }
+
+    public static class Header {
+        public static final int COMPRESSION_CODEC_MASK = 0x3;
+        public static final int COMPRESSION_CODEC_NONE = 0x0;
+        public static final int COMPRESSION_CODEC_LZ4 = 0x1;
+
+        private int flags = 0;
+        private int decompressedSize = 0;
+        private CompressionCodec.Type compressionType = CompressionCodec.Type.UNKNOWN;
+
+        // Whether this struct is ready for reading/writing.
+        private boolean ready = false;
+
+        // Used while reading.
+        public Header() {
+        }
+
+        public Header(CompressionCodec.Type compressionType,
+                      int decompressedSize) {
+            this.compressionType = compressionType;
+            this.decompressedSize = decompressedSize;
+            this.flags = 0;
+            switch (compressionType) {
+                case NONE:
+                    this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK,
+                                                        COMPRESSION_CODEC_NONE);
+                    break;
+                case LZ4:
+                    this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK,
+                                                        COMPRESSION_CODEC_LZ4);
+                    break;
+                default:
+                    throw new RuntimeException(String.format("Unknown Compression Type: %s",
+                                                             compressionType));
+            }
+            // This can now be written.
+            this.ready = true;
+        }
+
+        private void write(DataOutputStream out) throws IOException {
+            out.writeInt(flags);
+            out.writeInt(decompressedSize);
+        }
+
+        private void read(DataInputStream in) throws IOException {
+            this.flags = in.readInt();
+            int compressionType = (int) BitMaskUtils.get(flags, COMPRESSION_CODEC_MASK);
+            if (compressionType == COMPRESSION_CODEC_NONE) {
+                this.compressionType = CompressionCodec.Type.NONE;
+            } else if (compressionType == COMPRESSION_CODEC_LZ4) {
+                this.compressionType = CompressionCodec.Type.LZ4;
+            } else {
+                throw new IOException(String.format("Unsupported Compression Type: %s",
+                                                    compressionType));
+            }
+            this.decompressedSize = in.readInt();
+            // Values can now be read.
+            this.ready = true;
+        }
+    }
+
+    public static class Payload {
+        private int length = 0;
+        private byte[] payload = null;
+
+        // Whether this struct is ready for reading/writing.
+        private boolean ready = false;
+
+        // Used for reading
+        Payload() {
+        }
+
+        Payload(int length, byte[] payload) {
+            this.length = length;
+            this.payload = payload;
+            this.ready = true;
+        }
+
+        private void write(DataOutputStream out) throws IOException {
+            out.writeInt(length);
+            out.write(payload, 0, length);
+        }
+
+        private void read(DataInputStream in) throws IOException {
+            this.length = in.readInt();
+            this.payload = new byte[length];
+            in.readFully(payload);
+            this.ready = true;
+        }
+    }
+
+    /**
+     * Return an InputStream that reads from the provided InputStream, decompresses the data
+     * and returns a new InputStream wrapping the underlying payload.
+     *
+     * Note that src is modified by this call.
+     *
+     * @return
+     *      New Input stream with the underlying payload.
+     * @throws Exception
+     */
+    public static InputStream fromInputStream(InputStream src,
+                                              StatsLogger statsLogger) throws IOException {
+        src.mark(VERSION_LENGTH);
+        byte version = new DataInputStream(src).readByte();
+        src.reset();
+        EnvelopedEntry entry = new EnvelopedEntry(version, statsLogger);
+        entry.readFully(new DataInputStream(src));
+        return new ByteArrayInputStream(entry.getDecompressedPayload());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
new file mode 100644
index 0000000..1761de5
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.bookkeeper.stats.StatsLogger;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Record reader to read records from an enveloped entry buffer.
+ */
+class EnvelopedEntryReader implements Entry.Reader, RecordStream {
+
+    private final long logSegmentSeqNo;
+    private final long entryId;
+    private final LogRecord.Reader reader;
+
+    // slot id
+    private long slotId = 0;
+
+    EnvelopedEntryReader(long logSegmentSeqNo,
+                         long entryId,
+                         long startSequenceId,
+                         InputStream in,
+                         boolean envelopedEntry,
+                         boolean deserializeRecordSet,
+                         StatsLogger statsLogger)
+            throws IOException {
+        this.logSegmentSeqNo = logSegmentSeqNo;
+        this.entryId = entryId;
+        InputStream src = in;
+        if (envelopedEntry) {
+            src = EnvelopedEntry.fromInputStream(in, statsLogger);
+        }
+        this.reader = new LogRecord.Reader(
+                this,
+                new DataInputStream(src),
+                startSequenceId,
+                deserializeRecordSet);
+    }
+
+    @Override
+    public long getLSSN() {
+        return logSegmentSeqNo;
+    }
+
+    @Override
+    public long getEntryId() {
+        return entryId;
+    }
+
+    @Override
+    public LogRecordWithDLSN nextRecord() throws IOException {
+        return reader.readOp();
+    }
+
+    @Override
+    public boolean skipTo(long txId) throws IOException {
+        return reader.skipTo(txId, true);
+    }
+
+    @Override
+    public boolean skipTo(DLSN dlsn) throws IOException {
+        return reader.skipTo(dlsn);
+    }
+
+    //
+    // Record Stream
+    //
+
+    @Override
+    public void advance(int numRecords) {
+        slotId += numRecords;
+    }
+
+    @Override
+    public DLSN getCurrentPosition() {
+        return new DLSN(logSegmentSeqNo, entryId, slotId);
+    }
+
+    @Override
+    public String getName() {
+        return "EnvelopedReader";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
new file mode 100644
index 0000000..54858d7
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.Entry.Writer;
+import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
+import org.apache.distributedlog.exceptions.LogRecordTooLongException;
+import org.apache.distributedlog.exceptions.WriteCancelledException;
+import org.apache.distributedlog.exceptions.WriteException;
+import org.apache.distributedlog.io.Buffer;
+import org.apache.distributedlog.io.CompressionCodec;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+
+/**
+ * {@link org.apache.distributedlog.io.Buffer} based log record set writer.
+ */
+class EnvelopedEntryWriter implements Writer {
+
+    static final Logger logger = LoggerFactory.getLogger(EnvelopedEntryWriter.class);
+
+    private static class WriteRequest {
+
+        private final int numRecords;
+        private final Promise<DLSN> promise;
+
+        WriteRequest(int numRecords, Promise<DLSN> promise) {
+            this.numRecords = numRecords;
+            this.promise = promise;
+        }
+
+    }
+
+    private final String logName;
+    private final Buffer buffer;
+    private final LogRecord.Writer writer;
+    private final List<WriteRequest> writeRequests;
+    private final boolean envelopeBeforeTransmit;
+    private final CompressionCodec.Type codec;
+    private final StatsLogger statsLogger;
+    private int count = 0;
+    private boolean hasUserData = false;
+    private long maxTxId = Long.MIN_VALUE;
+
+    EnvelopedEntryWriter(String logName,
+                         int initialBufferSize,
+                         boolean envelopeBeforeTransmit,
+                         CompressionCodec.Type codec,
+                         StatsLogger statsLogger) {
+        this.logName = logName;
+        this.buffer = new Buffer(initialBufferSize * 6 / 5);
+        this.writer = new LogRecord.Writer(new DataOutputStream(buffer));
+        this.writeRequests = new LinkedList<WriteRequest>();
+        this.envelopeBeforeTransmit = envelopeBeforeTransmit;
+        this.codec = codec;
+        this.statsLogger = statsLogger;
+    }
+
+    @Override
+    public synchronized void reset() {
+        cancelPromises(new WriteCancelledException(logName, "Record Set is reset"));
+        count = 0;
+        this.buffer.reset();
+    }
+
+    @Override
+    public synchronized void writeRecord(LogRecord record,
+                                         Promise<DLSN> transmitPromise)
+            throws LogRecordTooLongException, WriteException {
+        int logRecordSize = record.getPersistentSize();
+        if (logRecordSize > MAX_LOGRECORD_SIZE) {
+            throw new LogRecordTooLongException(
+                    "Log Record of size " + logRecordSize + " written when only "
+                            + MAX_LOGRECORD_SIZE + " is allowed");
+        }
+
+        try {
+            this.writer.writeOp(record);
+            int numRecords = 1;
+            if (!record.isControl()) {
+                hasUserData = true;
+            }
+            if (record.isRecordSet()) {
+                numRecords = LogRecordSet.numRecords(record);
+            }
+            count += numRecords;
+            writeRequests.add(new WriteRequest(numRecords, transmitPromise));
+            maxTxId = Math.max(maxTxId, record.getTransactionId());
+        } catch (IOException e) {
+            logger.error("Failed to append record to record set of {} : ",
+                    logName, e);
+            throw new WriteException(logName, "Failed to append record to record set of "
+                    + logName);
+        }
+    }
+
+    private synchronized void satisfyPromises(long lssn, long entryId) {
+        long nextSlotId = 0;
+        for (WriteRequest request : writeRequests) {
+            request.promise.setValue(new DLSN(lssn, entryId, nextSlotId));
+            nextSlotId += request.numRecords;
+        }
+        writeRequests.clear();
+    }
+
+    private synchronized void cancelPromises(Throwable reason) {
+        for (WriteRequest request : writeRequests) {
+            request.promise.setException(reason);
+        }
+        writeRequests.clear();
+    }
+
+    @Override
+    public synchronized long getMaxTxId() {
+        return maxTxId;
+    }
+
+    @Override
+    public synchronized boolean hasUserRecords() {
+        return hasUserData;
+    }
+
+    @Override
+    public int getNumBytes() {
+        return buffer.size();
+    }
+
+    @Override
+    public synchronized int getNumRecords() {
+        return count;
+    }
+
+    @Override
+    public synchronized Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException {
+        if (!envelopeBeforeTransmit) {
+            return buffer;
+        }
+        // We can't escape this allocation because things need to be read from one byte array
+        // and then written to another. This is the destination.
+        Buffer toSend = new Buffer(buffer.size());
+        byte[] decompressed = buffer.getData();
+        int length = buffer.size();
+        EnvelopedEntry entry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
+                                                  codec,
+                                                  decompressed,
+                                                  length,
+                                                  statsLogger);
+        // This will cause an allocation of a byte[] for compression. This can be avoided
+        // but we can do that later only if needed.
+        entry.writeFully(new DataOutputStream(toSend));
+        return toSend;
+    }
+
+    @Override
+    public synchronized DLSN finalizeTransmit(long lssn, long entryId) {
+        return new DLSN(lssn, entryId, count - 1);
+    }
+
+    @Override
+    public void completeTransmit(long lssn, long entryId) {
+        satisfyPromises(lssn, entryId);
+    }
+
+    @Override
+    public void abortTransmit(Throwable reason) {
+        cancelPromises(reason);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/LedgerReadPosition.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LedgerReadPosition.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LedgerReadPosition.java
new file mode 100644
index 0000000..f94495f
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LedgerReadPosition.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LedgerReadPosition {
+    static final Logger LOG = LoggerFactory.getLogger(LedgerReadPosition.class);
+
+    private static enum PartialOrderingComparisonResult {
+        NotComparable,
+        GreaterThan,
+        LessThan,
+        EqualTo
+    }
+
+    long ledgerId = DistributedLogConstants.UNRESOLVED_LEDGER_ID;
+    long logSegmentSequenceNo;
+    long entryId;
+
+    public LedgerReadPosition(long ledgerId, long logSegmentSequenceNo, long entryId) {
+        this.ledgerId = ledgerId;
+        this.logSegmentSequenceNo = logSegmentSequenceNo;
+        this.entryId = entryId;
+    }
+
+    public LedgerReadPosition(LedgerReadPosition that) {
+        this.ledgerId = that.ledgerId;
+        this.logSegmentSequenceNo = that.logSegmentSequenceNo;
+        this.entryId = that.entryId;
+    }
+
+
+    public LedgerReadPosition(final DLSN dlsn) {
+        this(dlsn.getLogSegmentSequenceNo(), dlsn.getEntryId());
+    }
+
+    public LedgerReadPosition(long logSegmentSequenceNo, long entryId) {
+        this.logSegmentSequenceNo = logSegmentSequenceNo;
+        this.entryId = entryId;
+    }
+
+    public long getLedgerId() {
+        if (DistributedLogConstants.UNRESOLVED_LEDGER_ID == ledgerId) {
+            LOG.trace("Ledger Id is not initialized");
+            throw new IllegalStateException("Ledger Id is not initialized");
+        }
+        return ledgerId;
+    }
+
+    public long getLogSegmentSequenceNumber() {
+        return logSegmentSequenceNo;
+    }
+
+    public long getEntryId() {
+        return entryId;
+    }
+
+    public void advance() {
+        entryId++;
+    }
+
+    public void positionOnNewLogSegment(long ledgerId, long logSegmentSequenceNo) {
+        this.ledgerId = ledgerId;
+        this.logSegmentSequenceNo = logSegmentSequenceNo;
+        this.entryId = 0L;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("(lid=%d, lseqNo=%d, eid=%d)", ledgerId, logSegmentSequenceNo, entryId);
+    }
+
+    public boolean definitelyLessThanOrEqualTo(LedgerReadPosition threshold) {
+        PartialOrderingComparisonResult result = comparePartiallyOrdered(threshold);
+        return ((result == PartialOrderingComparisonResult.LessThan) ||
+            (result == PartialOrderingComparisonResult.EqualTo));
+    }
+
+    public boolean definitelyLessThan(LedgerReadPosition threshold) {
+        PartialOrderingComparisonResult result = comparePartiallyOrdered(threshold);
+        return result == PartialOrderingComparisonResult.LessThan;
+    }
+
+    private PartialOrderingComparisonResult comparePartiallyOrdered(LedgerReadPosition threshold) {
+        // If no threshold is passed we cannot make a definitive comparison
+        if (null == threshold) {
+            return PartialOrderingComparisonResult.NotComparable;
+        }
+
+        if (this.logSegmentSequenceNo != threshold.logSegmentSequenceNo) {
+            if (this.logSegmentSequenceNo < threshold.logSegmentSequenceNo) {
+                return PartialOrderingComparisonResult.LessThan;
+            } else {
+                return PartialOrderingComparisonResult.GreaterThan;
+            }
+        } else if (this.ledgerId != threshold.ledgerId) {
+            // When logSegmentSequenceNo is equal we cannot definitely say that this
+            // position is less than the threshold unless ledgerIds are equal
+            // since LogSegmentSequenceNumber maybe inferred from transactionIds in older
+            // versions of the metadata.
+            return PartialOrderingComparisonResult.NotComparable;
+        } else if (this.getEntryId() < threshold.getEntryId()) {
+            return PartialOrderingComparisonResult.LessThan;
+        } else if (this.getEntryId() > threshold.getEntryId()) {
+            return PartialOrderingComparisonResult.GreaterThan;
+        } else {
+            return PartialOrderingComparisonResult.EqualTo;
+        }
+    }
+
+    /**
+     * Comparator for the key portion
+     */
+    public static final ReadAheadCacheKeyComparator COMPARATOR = new ReadAheadCacheKeyComparator();
+
+    // Only compares the key portion
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof LedgerReadPosition)) {
+            return false;
+        }
+        LedgerReadPosition key = (LedgerReadPosition) other;
+        return ledgerId == key.ledgerId &&
+            entryId == key.entryId;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) (ledgerId * 13 ^ entryId * 17);
+    }
+
+    /**
+     * Compare EntryKey.
+     */
+    protected static class ReadAheadCacheKeyComparator implements Comparator<LedgerReadPosition>, Serializable {
+
+        private static final long serialVersionUID = 0L;
+
+        @Override
+        public int compare(LedgerReadPosition left, LedgerReadPosition right) {
+            long ret = left.ledgerId - right.ledgerId;
+            if (ret == 0) {
+                ret = left.entryId - right.entryId;
+            }
+            return (ret < 0) ? -1 : ((ret > 0) ? 1 : 0);
+        }
+    }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
new file mode 100644
index 0000000..5623525
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.base.Optional;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.metadata.DLMetadata;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
+import org.apache.bookkeeper.util.IOUtils;
+import org.apache.bookkeeper.util.LocalBookKeeper;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility class for setting up bookkeeper ensembles
+ * and bringing individual bookies up and down
+ */
+public class LocalDLMEmulator {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalDLMEmulator.class);
+
+    public static final String DLOG_NAMESPACE = "/messaging/distributedlog";
+
+    private static final int DEFAULT_BOOKIE_INITIAL_PORT = 0; // Use ephemeral ports
+    private static final int DEFAULT_ZK_TIMEOUT_SEC = 10;
+    private static final int DEFAULT_ZK_PORT = 2181;
+    private static final String DEFAULT_ZK_HOST = "127.0.0.1";
+    private static final String DEFAULT_ZK_ENSEMBLE = DEFAULT_ZK_HOST + ":" + DEFAULT_ZK_PORT;
+    private static final int DEFAULT_NUM_BOOKIES = 3;
+    private static final ServerConfiguration DEFAULT_SERVER_CONFIGURATION = new ServerConfiguration();
+
+    private final String zkEnsemble;
+    private final URI uri;
+    private final List<File> tmpDirs = new ArrayList<File>();
+    private final int zkTimeoutSec;
+    private final Thread bkStartupThread;
+    private final String zkHost;
+    private final int zkPort;
+    private final int numBookies;
+
+    public static class Builder {
+        private int zkTimeoutSec = DEFAULT_ZK_TIMEOUT_SEC;
+        private int numBookies = DEFAULT_NUM_BOOKIES;
+        private String zkHost = DEFAULT_ZK_HOST;
+        private int zkPort = DEFAULT_ZK_PORT;
+        private int initialBookiePort = DEFAULT_BOOKIE_INITIAL_PORT;
+        private boolean shouldStartZK = true;
+        private Optional<ServerConfiguration> serverConf = Optional.absent();
+
+        public Builder numBookies(int numBookies) {
+            this.numBookies = numBookies;
+            return this;
+        }
+        public Builder zkHost(String zkHost) {
+            this.zkHost = zkHost;
+            return this;
+        }
+        public Builder zkPort(int zkPort) {
+            this.zkPort = zkPort;
+            return this;
+        }
+        public Builder zkTimeoutSec(int zkTimeoutSec) {
+            this.zkTimeoutSec = zkTimeoutSec;
+            return this;
+        }
+        public Builder initialBookiePort(int initialBookiePort) {
+            this.initialBookiePort = initialBookiePort;
+            return this;
+        }
+        public Builder shouldStartZK(boolean shouldStartZK) {
+            this.shouldStartZK = shouldStartZK;
+            return this;
+        }
+        public Builder serverConf(ServerConfiguration serverConf) {
+            this.serverConf = Optional.of(serverConf);
+            return this;
+        }
+
+        public LocalDLMEmulator build() throws Exception {
+            ServerConfiguration conf = null;
+            if (serverConf.isPresent()) {
+                conf = serverConf.get();
+            } else {
+                conf = (ServerConfiguration) DEFAULT_SERVER_CONFIGURATION.clone();
+                conf.setZkTimeout(zkTimeoutSec * 1000);
+            }
+            ServerConfiguration newConf = new ServerConfiguration();
+            newConf.loadConf(conf);
+            newConf.setAllowLoopback(true);
+
+            return new LocalDLMEmulator(numBookies, shouldStartZK, zkHost, zkPort,
+                initialBookiePort, zkTimeoutSec, newConf);
+        }
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    private LocalDLMEmulator(final int numBookies, final boolean shouldStartZK, final String zkHost, final int zkPort, final int initialBookiePort, final int zkTimeoutSec, final ServerConfiguration serverConf) throws Exception {
+        this.numBookies = numBookies;
+        this.zkHost = zkHost;
+        this.zkPort = zkPort;
+        this.zkEnsemble = zkHost + ":" + zkPort;
+        this.uri = URI.create("distributedlog://" + zkEnsemble + DLOG_NAMESPACE);
+        this.zkTimeoutSec = zkTimeoutSec;
+        this.bkStartupThread = new Thread() {
+            public void run() {
+                try {
+                    LOG.info("Starting {} bookies : allowLoopback = {}", numBookies, serverConf.getAllowLoopback());
+                    LocalBookKeeper.startLocalBookies(zkHost, zkPort, numBookies, shouldStartZK, initialBookiePort, serverConf);
+                    LOG.info("{} bookies are started.");
+                } catch (InterruptedException e) {
+                    // go away quietly
+                } catch (Exception e) {
+                    LOG.error("Error starting local bk", e);
+                }
+            }
+        };
+    }
+
+    public void start() throws Exception {
+        bkStartupThread.start();
+        if (!LocalBookKeeper.waitForServerUp(zkEnsemble, zkTimeoutSec*1000)) {
+            throw new Exception("Error starting zookeeper/bookkeeper");
+        }
+        int bookiesUp = checkBookiesUp(numBookies, zkTimeoutSec);
+        assert (numBookies == bookiesUp);
+        // Provision "/messaging/distributedlog" namespace
+        DLMetadata.create(new BKDLConfig(zkEnsemble, "/ledgers")).create(uri);
+    }
+
+    public void teardown() throws Exception {
+        if (bkStartupThread != null) {
+            bkStartupThread.interrupt();
+            bkStartupThread.join();
+        }
+        for (File dir : tmpDirs) {
+            FileUtils.deleteDirectory(dir);
+        }
+    }
+
+    public String getZkServers() {
+        return zkEnsemble;
+    }
+
+    public URI getUri() {
+        return uri;
+    }
+
+    public BookieServer newBookie() throws Exception {
+        ServerConfiguration bookieConf = new ServerConfiguration();
+        bookieConf.setZkTimeout(zkTimeoutSec * 1000);
+        bookieConf.setBookiePort(0);
+        bookieConf.setAllowLoopback(true);
+        File tmpdir = File.createTempFile("bookie" + UUID.randomUUID() + "_",
+            "test");
+        if (!tmpdir.delete()) {
+            LOG.debug("Fail to delete tmpdir " + tmpdir);
+        }
+        if (!tmpdir.mkdir()) {
+            throw new IOException("Fail to create tmpdir " + tmpdir);
+        }
+        tmpDirs.add(tmpdir);
+
+        bookieConf.setZkServers(zkEnsemble);
+        bookieConf.setJournalDirName(tmpdir.getPath());
+        bookieConf.setLedgerDirNames(new String[]{tmpdir.getPath()});
+
+        BookieServer b = new BookieServer(bookieConf);
+        b.start();
+        for (int i = 0; i < 10 && !b.isRunning(); i++) {
+            Thread.sleep(10000);
+        }
+        if (!b.isRunning()) {
+            throw new IOException("Bookie would not start");
+        }
+        return b;
+    }
+
+    /**
+     * Check that a number of bookies are available
+     *
+     * @param count number of bookies required
+     * @param timeout number of seconds to wait for bookies to start
+     * @throws java.io.IOException if bookies are not started by the time the timeout hits
+     */
+    public int checkBookiesUp(int count, int timeout) throws Exception {
+        ZooKeeper zkc = connectZooKeeper(zkHost, zkPort, zkTimeoutSec);
+        try {
+            int mostRecentSize = 0;
+            for (int i = 0; i < timeout; i++) {
+                try {
+                    List<String> children = zkc.getChildren("/ledgers/available",
+                        false);
+                    children.remove("readonly");
+                    mostRecentSize = children.size();
+                    if ((mostRecentSize > count) || LOG.isDebugEnabled()) {
+                        LOG.info("Found " + mostRecentSize + " bookies up, "
+                            + "waiting for " + count);
+                        if ((mostRecentSize > count) || LOG.isTraceEnabled()) {
+                            for (String child : children) {
+                                LOG.info(" server: " + child);
+                            }
+                        }
+                    }
+                    if (mostRecentSize == count) {
+                        break;
+                    }
+                } catch (KeeperException e) {
+                    // ignore
+                }
+                Thread.sleep(1000);
+            }
+            return mostRecentSize;
+        } finally {
+            zkc.close();
+        }
+    }
+
+    public static String getBkLedgerPath() {
+        return "/ledgers";
+    }
+
+    public static ZooKeeper connectZooKeeper(String zkHost, int zkPort)
+        throws IOException, KeeperException, InterruptedException {
+            return connectZooKeeper(zkHost, zkPort, DEFAULT_ZK_TIMEOUT_SEC);
+    }
+
+    public static ZooKeeper connectZooKeeper(String zkHost, int zkPort, int zkTimeoutSec)
+        throws IOException, KeeperException, InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final String zkHostPort = zkHost + ":" + zkPort;
+
+        ZooKeeper zkc = new ZooKeeper(zkHostPort, zkTimeoutSec * 1000, new Watcher() {
+            public void process(WatchedEvent event) {
+                if (event.getState() == Event.KeeperState.SyncConnected) {
+                    latch.countDown();
+                }
+            }
+        });
+        if (!latch.await(zkTimeoutSec, TimeUnit.SECONDS)) {
+            throw new IOException("Zookeeper took too long to connect");
+        }
+        return zkc;
+    }
+
+    public static URI createDLMURI(String path) throws Exception {
+        return createDLMURI(DEFAULT_ZK_ENSEMBLE, path);
+    }
+
+    public static URI createDLMURI(String zkServers, String path) throws Exception {
+        return URI.create("distributedlog://" + zkServers + DLOG_NAMESPACE + path);
+    }
+
+    /**
+     * Try to start zookkeeper locally on any port.
+     */
+    public static Pair<ZooKeeperServerShim, Integer> runZookeeperOnAnyPort(File zkDir) throws Exception {
+        return runZookeeperOnAnyPort((int) (Math.random()*10000+7000), zkDir);
+    }
+
+    /**
+     * Try to start zookkeeper locally on any port beginning with some base port.
+     * Dump some socket info when bind fails.
+     */
+    public static Pair<ZooKeeperServerShim, Integer> runZookeeperOnAnyPort(int basePort, File zkDir) throws Exception {
+
+        final int MAX_RETRIES = 20;
+        final int MIN_PORT = 1025;
+        final int MAX_PORT = 65535;
+        ZooKeeperServerShim zks = null;
+        int zkPort = basePort;
+        boolean success = false;
+        int retries = 0;
+
+        while (!success) {
+            try {
+                LOG.info("zk trying to bind to port " + zkPort);
+                zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkDir);
+                success = true;
+            } catch (BindException be) {
+                retries++;
+                if (retries > MAX_RETRIES) {
+                    throw be;
+                }
+                zkPort++;
+                if (zkPort > MAX_PORT) {
+                    zkPort = MIN_PORT;
+                }
+            }
+        }
+
+        return Pair.of(zks, zkPort);
+    }
+
+    public static void main(String[] args) throws Exception {
+        try {
+            if (args.length < 1) {
+                System.out.println("Usage: LocalDLEmulator <zk_port>");
+                System.exit(-1);
+            }
+
+            final int zkPort = Integer.parseInt(args[0]);
+            final File zkDir = IOUtils.createTempDir("distrlog", "zookeeper");
+            final LocalDLMEmulator localDlm = LocalDLMEmulator.newBuilder()
+                .zkPort(zkPort)
+                .build();
+
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        localDlm.teardown();
+                        FileUtils.deleteDirectory(zkDir);
+                        System.out.println("ByeBye!");
+                    } catch (Exception e) {
+                        // do nothing
+                    }
+                }
+            });
+            localDlm.start();
+
+            System.out.println(String.format(
+                "DistributedLog Sandbox is running now. You could access distributedlog://%s:%s",
+                DEFAULT_ZK_HOST,
+                zkPort));
+        } catch (Exception ex) {
+            System.out.println("Exception occurred running emulator " + ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java
new file mode 100644
index 0000000..75a32ef
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.io.AsyncCloseable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * <i>LogReader</i> is a `synchronous` reader reading records from a DL log.
+ *
+ * <h3>Lifecycle of a Reader</h3>
+ *
+ * A reader is a <i>sequential</i> reader that read records from a DL log starting
+ * from a given position. The position could be a <i>DLSN</i> (via {@link DistributedLogManager#getInputStream(DLSN)}
+ * or a <i>Transaction ID</i> (via {@link DistributedLogManager#getInputStream(long)}.
+ * <p>
+ * After the reader is open, it could call {@link #readNext(boolean)} or {@link #readBulk(boolean, int)}
+ * to read records out the log from provided position.
+ * <p>
+ * Closing the reader (via {@link #close()} will release all the resources occupied
+ * by this reader instance.
+ * <p>
+ * Exceptions could be thrown during reading records. Once the exception is thrown,
+ * the reader is set to an error state and it isn't usable anymore. It is the application's
+ * responsibility to handle the exceptions and re-create readers if necessary.
+ * <p>
+ * Example:
+ * <pre>
+ * DistributedLogManager dlm = ...;
+ * long nextTxId = ...;
+ * LogReader reader = dlm.getInputStream(nextTxId);
+ *
+ * while (true) { // keep reading & processing records
+ *     LogRecord record;
+ *     try {
+ *         record = reader.readNext(false);
+ *         nextTxId = record.getTransactionId();
+ *         // process the record
+ *         ...
+ *     } catch (IOException ioe) {
+ *         // handle the exception
+ *         ...
+ *         reader = dlm.getInputStream(nextTxId + 1);
+ *     }
+ * }
+ *
+ * </pre>
+ *
+ * <h3>Read Records</h3>
+ *
+ * Reading records from an <i>endless</i> log in `synchronous` way isn't as
+ * trivial as in `asynchronous` way (via {@link AsyncLogReader}. Because it
+ * lacks of callback mechanism. LogReader introduces a flag `nonBlocking` on
+ * controlling the <i>waiting</i> behavior on `synchronous` reads.
+ *
+ * <h4>Blocking vs NonBlocking</h4>
+ *
+ * <i>Blocking</i> (nonBlocking = false) means the reads will wait for records
+ * before returning read calls. While <i>NonBlocking</i> (nonBlocking = true)
+ * means the reads will only check readahead cache and return whatever records
+ * available in the readahead cache.
+ * <p>
+ * The <i>waiting</i> period varies in <i>blocking</i> mode. If the reader is
+ * catching up with writer (there are records in the log), the read call will
+ * wait until records are read and returned. If the reader is caught up with
+ * writer (there are no more records in the log at read time), the read call
+ * will wait for a small period of time (defined in
+ * {@link DistributedLogConfiguration#getReadAheadWaitTime()} and return whatever
+ * records available in the readahead cache. In other words, if a reader sees
+ * no record on blocking reads, it means the reader is `caught-up` with the
+ * writer.
+ * <p>
+ * <i>Blocking</i> and <i>NonBlocking</i> modes are useful for building replicated
+ * state machines. Applications could use <i>blocking</i> reads till caught up
+ * with latest data. Once they are caught up with latest data, they could start
+ * serving their service and turn to <i>non-blocking</i> read mode and tail read
+ * data from the logs.
+ * <p>
+ * See examples below.
+ *
+ * <h4>Read Single Record</h4>
+ *
+ * {@link #readNext(boolean)} is reading individual records from a DL log.
+ *
+ * <pre>
+ * LogReader reader = ...
+ *
+ * // keep reading records in blocking way until no records available in the log
+ * LogRecord record = reader.readNext(false);
+ * while (null != record) {
+ *     // process the record
+ *     ...
+ *     // read next record
+ *     records = reader.readNext(false);
+ * }
+ *
+ * ...
+ *
+ * // reader is caught up with writer, doing non-blocking reads to tail the log
+ * while (true) {
+ *     record = reader.readNext(true)
+ *     // process the new records
+ *     ...
+ * }
+ * </pre>
+ *
+ * <h4>Read Batch of Records</h4>
+ *
+ * {@link #readBulk(boolean, int)} is a convenient way to read a batch of records
+ * from a DL log.
+ *
+ * <pre>
+ * LogReader reader = ...
+ * int N = 10;
+ *
+ * // keep reading N records in blocking way until no records available in the log
+ * List<LogRecord> records = reader.readBulk(false, N);
+ * while (!records.isEmpty()) {
+ *     // process the list of records
+ *     ...
+ *     if (records.size() < N) { // no more records available in the log
+ *         break;
+ *     }
+ *     // read next N records
+ *     records = reader.readBulk(false, N);
+ * }
+ *
+ * ...
+ *
+ * // reader is caught up with writer, doing non-blocking reads to tail the log
+ * while (true) {
+ *     records = reader.readBulk(true, N)
+ *     // process the new records
+ *     ...
+ * }
+ *
+ * </pre>
+ *
+ * @see AsyncLogReader
+ *
+ * NOTE:
+ * 1. Extending {@link AsyncCloseable}: BKSyncLogReader is implemented based on BKAsyncLogReader, exposing
+ *    the {@link AsyncCloseable} interface so the reader could be closed asynchronously
+ */
+public interface LogReader extends Closeable, AsyncCloseable {
+
+    /**
+     * Read the next log record from the stream.
+     * <p>
+     * If <i>nonBlocking</i> is set to true, the call returns immediately by just polling
+     * records from read ahead cache. It would return <i>null</i> if there isn't any records
+     * available in the read ahead cache.
+     * <p>
+     * If <i>nonBlocking</i> is set to false, it would does blocking call. The call will
+     * block until return a record if there are records in the stream (aka catching up).
+     * Otherwise it would wait up to {@link DistributedLogConfiguration#getReadAheadWaitTime()}
+     * milliseconds and return null if there isn't any more records in the stream.
+     *
+     * @param nonBlocking should the read make blocking calls to the backend or rely on the
+     * readAhead cache
+     * @return an operation from the stream or null if at end of stream
+     * @throws IOException if there is an error reading from the stream
+     */
+    public LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException;
+
+    /**
+     * Read the next <i>numLogRecords</i> log records from the stream
+     *
+     * @param nonBlocking should the read make blocking calls to the backend or rely on the
+     * readAhead cache
+     * @param numLogRecords maximum number of log records returned by this call.
+     * @return an operation from the stream or empty list if at end of stream
+     * @throws IOException if there is an error reading from the stream
+     * @see #readNext(boolean)
+     */
+    public List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) throws IOException;
+}