You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:29 UTC

[15/31] incubator-distributedlog git commit: DL-159: ReadAhead Improvement (part 2) - New ReadAhead Reader using the LogSegmentEntryReader interface

DL-159: ReadAhead Improvement (part 2) - New ReadAhead Reader using the LogSegmentEntryReader interface

Provide a new ReadAhead reader using the log segment entry reader interface. It does reading entries in a batch in parallel for batches, rather than reading entries in batch by batch. This would help mitigate the slow bookie problem.

The core change is the new ReadAheadEntryReader.


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/7a977972
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/7a977972
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/7a977972

Branch: refs/heads/master
Commit: 7a977972200da9e86f4557bb002ce16cb60d236a
Parents: 27c04f3
Author: Sijie Guo <si...@twitter.com>
Authored: Wed Dec 28 15:09:38 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Thu Dec 29 02:11:40 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKAsyncLogReaderDLSN.java    | 198 ++--
 .../distributedlog/BKDistributedLogManager.java |  54 +-
 .../distributedlog/BKLogReadHandler.java        |  90 +-
 .../distributedlog/BKSyncLogReaderDLSN.java     |  80 +-
 .../java/com/twitter/distributedlog/Entry.java  |  14 +
 .../twitter/distributedlog/EntryPosition.java   |  63 ++
 .../distributedlog/EnvelopedEntryReader.java    |  10 +
 .../com/twitter/distributedlog/MaxTxId.java     |   2 +-
 .../distributedlog/ReadAheadEntryReader.java    | 966 +++++++++++++++++++
 .../logsegment/BKLogSegmentEntryReader.java     | 118 ++-
 .../impl/logsegment/BKLogSegmentEntryStore.java | 127 +++
 .../injector/AsyncFailureInjector.java          |   7 +-
 .../injector/AsyncRandomFailureInjector.java    |  15 +-
 .../logsegment/LogSegmentEntryReader.java       |  21 +
 .../logsegment/LogSegmentEntryStore.java        |   4 +-
 .../rate/MovingAverageRateFactory.java          |   2 +-
 .../readahead/ReadAheadWorker.java              |  12 +-
 .../distributedlog/util/OrderedScheduler.java   |   8 +
 .../distributedlog/TestAsyncReaderLock.java     |   2 +-
 .../distributedlog/TestAsyncReaderWriter.java   |  39 +-
 .../distributedlog/TestBKSyncLogReader.java     |   5 +-
 .../distributedlog/TestEntryPosition.java       |  59 ++
 .../TestNonBlockingReadsMultiReader.java        |   5 +-
 .../twitter/distributedlog/TestReadAhead.java   | 158 ---
 .../TestReadAheadEntryReader.java               | 423 ++++++++
 .../com/twitter/distributedlog/TestReader.java  |   4 +
 .../distributedlog/TestRollLogSegments.java     |  12 +-
 .../twitter/distributedlog/TestTruncate.java    |   1 -
 .../logsegment/TestBKLogSegmentEntryReader.java |  38 +-
 .../LogSegmentIsTruncatedException.java         |  33 +
 .../src/main/thrift/service.thrift              |   2 +
 31 files changed, 2114 insertions(+), 458 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
index 2ca064c..e347012 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
@@ -20,6 +20,7 @@ package com.twitter.distributedlog;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
 import com.twitter.distributedlog.exceptions.DLIllegalStateException;
 import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.EndOfStreamException;
@@ -27,9 +28,7 @@ import com.twitter.distributedlog.exceptions.IdleReaderException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.ReadCancelledException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.injector.AsyncFailureInjector;
-import com.twitter.distributedlog.injector.AsyncRandomFailureInjector;
-import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
@@ -48,10 +47,12 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Function1;
 import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
 
 /**
  * BookKeeper based {@link AsyncLogReader} implementation.
@@ -83,7 +84,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
             };
 
     protected final BKDistributedLogManager bkDistributedLogManager;
-    protected final BKLogReadHandler bkLedgerManager;
+    protected final BKLogReadHandler readHandler;
     private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
     private final ScheduledExecutorService executorService;
     private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>();
@@ -92,19 +93,19 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
     final private Stopwatch scheduleDelayStopwatch;
     final private Stopwatch readNextDelayStopwatch;
     private DLSN startDLSN;
-    private boolean readAheadStarted = false;
+    private ReadAheadEntryReader readAheadReader = null;
     private int lastPosition = 0;
     private final boolean positionGapDetectionEnabled;
     private final int idleErrorThresholdMillis;
     final ScheduledFuture<?> idleReaderTimeoutTask;
     private ScheduledFuture<?> backgroundScheduleTask = null;
+    // last process time
+    private final Stopwatch lastProcessTime;
 
     protected Promise<Void> closeFuture = null;
 
     private boolean lockStream = false;
 
-    private boolean disableReadAheadLogSegmentsNotification = false;
-
     private final boolean returnEndOfStreamRecord;
 
     private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() {
@@ -122,7 +123,6 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
     private LogRecordWithDLSN nextRecord = null;
 
     // Failure Injector
-    private final AsyncFailureInjector failureInjector;
     private boolean disableProcessingReadRequests = false;
 
     // Stats
@@ -198,7 +198,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
 
         void complete() {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("{} : Satisfied promise with {} records", bkLedgerManager.getFullyQualifiedName(), records.size());
+                LOG.trace("{} : Satisfied promise with {} records", readHandler.getFullyQualifiedName(), records.size());
             }
             delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS));
             Stopwatch stopwatch = Stopwatch.createStarted();
@@ -212,12 +212,11 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
                          DLSN startDLSN,
                          Optional<String> subscriberId,
                          boolean returnEndOfStreamRecord,
-                         boolean deserializeRecordSet,
                          StatsLogger statsLogger) {
         this.bkDistributedLogManager = bkdlm;
         this.executorService = executorService;
-        this.bkLedgerManager = bkDistributedLogManager.createReadHandler(subscriberId,
-                this, deserializeRecordSet, true);
+        this.readHandler = bkDistributedLogManager.createReadHandler(subscriberId,
+                this, true);
         LOG.debug("Starting async reader at {}", startDLSN);
         this.startDLSN = startDLSN;
         this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
@@ -226,16 +225,6 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
         this.idleErrorThresholdMillis = bkdlm.getConf().getReaderIdleErrorThresholdMillis();
         this.returnEndOfStreamRecord = returnEndOfStreamRecord;
 
-        // Failure Injection
-        this.failureInjector = AsyncRandomFailureInjector.newBuilder()
-                .injectDelays(bkdlm.getConf().getEIInjectReadAheadDelay(),
-                              bkdlm.getConf().getEIInjectReadAheadDelayPercent(),
-                              bkdlm.getConf().getEIInjectMaxReadAheadDelayMs())
-                .injectErrors(false, 10)
-                .injectStops(bkdlm.getConf().getEIInjectReadAheadStall(), 10)
-                .injectCorruption(bkdlm.getConf().getEIInjectReadAheadBrokenEntries())
-                .build();
-
         // Stats
         StatsLogger asyncReaderStatsLogger = statsLogger.scope("async_reader");
         futureSetLatency = asyncReaderStatsLogger.getOpStatsLogger("future_set");
@@ -252,6 +241,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
         // Lock the stream if requested. The lock will be released when the reader is closed.
         this.lockStream = false;
         this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary();
+        this.lastProcessTime = Stopwatch.createStarted();
     }
 
     private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
@@ -276,7 +266,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
                         return;
                     }
 
-                    ReadAheadCache cache = bkLedgerManager.getReadAheadCache();
+                    ReadAheadEntryReader readAheadReader = getReadAheadReader();
 
                     // read request has been idle
                     //   - cache has records but read request are idle,
@@ -284,32 +274,27 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
                     //   - cache is empty and readahead is idle (no records added for a long time)
                     idleReaderCheckIdleReadAheadCount.inc();
                     try {
-                        if (!hasMoreRecords(cache)
-                                && !cache.isReadAheadIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) {
+                        if (null == readAheadReader || (!hasMoreRecords() &&
+                                readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS))) {
+                            markReaderAsIdle();
                             return;
+                        } else if (lastProcessTime.elapsed(TimeUnit.MILLISECONDS) > idleErrorThresholdMillis) {
+                            markReaderAsIdle();;
                         }
                     } catch (IOException e) {
-                        // we encountered exceptions on checking more records
                         setLastException(e);
                         return;
                     }
-
-                    idleReaderError.inc();
-                    IdleReaderException ire = new IdleReaderException("Reader on stream "
-                            + bkLedgerManager.getFullyQualifiedName()
-                            + " is idle for " + idleErrorThresholdMillis +" ms");
-                    setLastException(ire);
-                    // cancel all pending reads directly rather than notifying on error
-                    // because idle reader could happen on idle read requests that usually means something wrong
-                    // in scheduling reads
-                    cancelAllPendingReads(ire);
                 }
             }, period, period, TimeUnit.MILLISECONDS);
         }
-
         return null;
     }
 
+    synchronized ReadAheadEntryReader getReadAheadReader() {
+        return readAheadReader;
+    }
+
     void cancelIdleReaderTask() {
         // Do this after we have checked that the reader was not previously closed
         try {
@@ -317,12 +302,24 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
                 idleReaderTimeoutTask.cancel(true);
             }
         } catch (Exception exc) {
-            LOG.info("{}: Failed to cancel the background idle reader timeout task", bkLedgerManager.getFullyQualifiedName());
+            LOG.info("{}: Failed to cancel the background idle reader timeout task", readHandler.getFullyQualifiedName());
         }
     }
 
+    private void markReaderAsIdle() {
+        idleReaderError.inc();
+        IdleReaderException ire = new IdleReaderException("Reader on stream "
+                + readHandler.getFullyQualifiedName()
+                + " is idle for " + idleErrorThresholdMillis +" ms");
+        setLastException(ire);
+        // cancel all pending reads directly rather than notifying on error
+        // because idle reader could happen on idle read requests that usually means something wrong
+        // in scheduling reads
+        cancelAllPendingReads(ire);
+    }
+
     protected synchronized void setStartDLSN(DLSN fromDLSN) throws UnexpectedException {
-        if (readAheadStarted) {
+        if (null != readAheadReader) {
             throw new UnexpectedException("Could't reset from dlsn after reader already starts reading.");
         }
         startDLSN = fromDLSN;
@@ -335,14 +332,14 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
 
     public Future<Void> lockStream() {
         this.lockStream = true;
-        return bkLedgerManager.lockStream();
+        return readHandler.lockStream();
     }
 
     private boolean checkClosedOrInError(String operation) {
         if (null == lastException.get()) {
             try {
-                if (null != bkLedgerManager && null != bkLedgerManager.readAheadWorker) {
-                    bkLedgerManager.readAheadWorker.checkClosedOrInError();
+                if (null != readHandler && null != getReadAheadReader()) {
+                    getReadAheadReader().checkLastException();
                 }
 
                 bkDistributedLogManager.checkClosedOrInError(operation);
@@ -353,7 +350,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
 
         if (lockStream) {
             try {
-                bkLedgerManager.checkReadLock();
+                readHandler.checkReadLock();
             } catch (IOException ex) {
                 setLastException(ex);
             }
@@ -411,28 +408,44 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
         readNextDelayStopwatch.reset().start();
         final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit);
 
-        if (!readAheadStarted) {
-            bkLedgerManager.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() {
-                @Override
-                public void onSuccess(Void value) {
-                    try {
-                        bkLedgerManager.startReadAhead(
-                                new LedgerReadPosition(getStartDLSN()),
-                                failureInjector);
-                        if (disableReadAheadLogSegmentsNotification) {
-                            bkLedgerManager.disableReadAheadLogSegmentsNotification();
+        if (null == readAheadReader) {
+            try {
+                final ReadAheadEntryReader readAheadEntryReader = this.readAheadReader = new ReadAheadEntryReader(
+                        getStreamName(),
+                        getStartDLSN(),
+                        bkDistributedLogManager.getConf(),
+                        readHandler,
+                        bkDistributedLogManager.getReaderEntryStore(),
+                        bkDistributedLogManager.getScheduler(),
+                        Ticker.systemTicker(),
+                        bkDistributedLogManager.alertStatsLogger);
+                readHandler.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() {
+                    @Override
+                    public void onSuccess(Void value) {
+                        try {
+                            readHandler.registerListener(readAheadEntryReader);
+                            readHandler.asyncStartFetchLogSegments()
+                                    .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
+                                        @Override
+                                        public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
+                                            readAheadEntryReader.addStateChangeNotification(BKAsyncLogReaderDLSN.this);
+                                            readAheadEntryReader.start(logSegments.getValue());
+                                            return BoxedUnit.UNIT;
+                                        }
+                                    });
+                        } catch (Exception exc) {
+                            notifyOnError(exc);
                         }
-                    } catch (Exception exc) {
-                        notifyOnError(exc);
                     }
-                }
 
-                @Override
-                public void onFailure(Throwable cause) {
-                    notifyOnError(cause);
-                }
-            });
-            readAheadStarted = true;
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        notifyOnError(cause);
+                    }
+                });
+            } catch (IOException ioe) {
+                notifyOnError(ioe);
+            }
         }
 
         if (checkClosedOrInError("readNext")) {
@@ -475,7 +488,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
                 return closeFuture;
             }
             closePromise = closeFuture = new Promise<Void>();
-            exception = new ReadCancelledException(bkLedgerManager.getFullyQualifiedName(), "Reader was closed");
+            exception = new ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was closed");
             setLastException(exception);
         }
 
@@ -490,7 +503,15 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
 
         cancelAllPendingReads(exception);
 
-        FutureUtils.ignore(bkLedgerManager.asyncClose()).proxyTo(closePromise);
+        ReadAheadEntryReader readAheadReader = getReadAheadReader();
+        if (null != readAheadReader) {
+            readHandler.unregisterListener(readAheadReader);
+            readAheadReader.removeStateChangeNotification(this);
+        }
+        Utils.closeSequence(bkDistributedLogManager.getScheduler(), true,
+                readAheadReader,
+                readHandler
+        ).proxyTo(closePromise);
         return closePromise;
     }
 
@@ -501,25 +522,26 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
         pendingRequests.clear();
     }
 
-    boolean hasMoreRecords() throws IOException {
-        return hasMoreRecords(bkLedgerManager.readAheadCache);
-    }
-
-    private synchronized boolean hasMoreRecords(ReadAheadCache cache) throws IOException {
-        if (cache.getNumCachedEntries() > 0 || null != nextRecord) {
+    synchronized boolean hasMoreRecords() throws IOException {
+        if (null == readAheadReader) {
+            return false;
+        }
+        if (readAheadReader.getNumCachedEntries() > 0 || null != nextRecord) {
             return true;
         } else if (null != currentEntry) {
             nextRecord = currentEntry.nextRecord();
             return null != nextRecord;
-        } else {
-            return false;
         }
+        return false;
     }
 
     private synchronized LogRecordWithDLSN readNextRecord() throws IOException {
+        if (null == readAheadReader) {
+            return null;
+        }
         if (null == currentEntry) {
-            currentEntry = bkLedgerManager.getNextReadAheadEntry();
-            // no current entry after reading from read head then return null
+            currentEntry = readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
+            // no entry after reading from read ahead then return null
             if (null == currentEntry) {
                 return null;
             }
@@ -551,10 +573,10 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
             Stopwatch runTime = Stopwatch.createStarted();
             int iterations = 0;
             long scheduleCountLocal = scheduleCount.get();
-            LOG.debug("{}: Scheduled Background Reader", bkLedgerManager.getFullyQualifiedName());
+            LOG.debug("{}: Scheduled Background Reader", readHandler.getFullyQualifiedName());
             while(true) {
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace("{}: Executing Iteration: {}", bkLedgerManager.getFullyQualifiedName(), iterations++);
+                    LOG.trace("{}: Executing Iteration: {}", readHandler.getFullyQualifiedName(), iterations++);
                 }
 
                 PendingReadRequest nextRequest = null;
@@ -563,31 +585,32 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
 
                     // Queue is empty, nothing to read, return
                     if (null == nextRequest) {
-                        LOG.trace("{}: Queue Empty waiting for Input", bkLedgerManager.getFullyQualifiedName());
+                        LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName());
                         scheduleCount.set(0);
                         backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
                         return;
                     }
 
                     if (disableProcessingReadRequests) {
-                        LOG.info("Reader of {} is forced to stop processing read requests", bkLedgerManager.getFullyQualifiedName());
+                        LOG.info("Reader of {} is forced to stop processing read requests", readHandler.getFullyQualifiedName());
                         return;
                     }
                 }
+                lastProcessTime.reset().start();
 
                 // If the oldest pending promise is interrupted then we must mark
                 // the reader in error and abort all pending reads since we dont
                 // know the last consumed read
                 if (null == lastException.get()) {
                     if (nextRequest.getPromise().isInterrupted().isDefined()) {
-                        setLastException(new DLInterruptedException("Interrupted on reading " + bkLedgerManager.getFullyQualifiedName() + " : ",
+                        setLastException(new DLInterruptedException("Interrupted on reading " + readHandler.getFullyQualifiedName() + " : ",
                                 nextRequest.getPromise().isInterrupted().get()));
                     }
                 }
 
                 if (checkClosedOrInError("readNext")) {
                     if (!(lastException.get().getCause() instanceof LogNotFoundException)) {
-                        LOG.warn("{}: Exception", bkLedgerManager.getFullyQualifiedName(), lastException.get());
+                        LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get());
                     }
                     backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
                     return;
@@ -595,7 +618,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
 
                 try {
                     // Fail 10% of the requests when asked to simulate errors
-                    if (failureInjector.shouldInjectErrors()) {
+                    if (bkDistributedLogManager.getFailureInjector().shouldInjectErrors()) {
                         throw new IOException("Reader Simulated Exception");
                     }
                     LogRecordWithDLSN record;
@@ -609,15 +632,15 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
                         } else {
                             if (record.isEndOfStream() && !returnEndOfStreamRecord) {
                                 setLastException(new EndOfStreamException("End of Stream Reached for "
-                                        + bkLedgerManager.getFullyQualifiedName()));
+                                        + readHandler.getFullyQualifiedName()));
                                 break;
                             }
 
                             // gap detection
                             if (recordPositionsContainsGap(record, lastPosition)) {
-                                bkDistributedLogManager.raiseAlert("Gap detected between records at dlsn = {}", record.getDlsn());
+                                bkDistributedLogManager.raiseAlert("Gap detected between records at record = {}", record);
                                 if (positionGapDetectionEnabled) {
-                                    throw new DLIllegalStateException("Gap detected between records at dlsn = " + record.getDlsn());
+                                    throw new DLIllegalStateException("Gap detected between records at record = " + record);
                                 }
                             }
                             lastPosition = record.getLastPositionWithinLogSegment();
@@ -628,7 +651,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
                 } catch (IOException exc) {
                     setLastException(exc);
                     if (!(exc instanceof LogNotFoundException)) {
-                        LOG.warn("{} : read with skip Exception", bkLedgerManager.getFullyQualifiedName(), lastException.get());
+                        LOG.warn("{} : read with skip Exception", readHandler.getFullyQualifiedName(), lastException.get());
                     }
                     continue;
                 }
@@ -709,13 +732,12 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
 
     @VisibleForTesting
     void simulateErrors() {
-        failureInjector.injectErrors(true);
+        bkDistributedLogManager.getFailureInjector().injectErrors(true);
     }
 
     @VisibleForTesting
     synchronized void disableReadAheadLogSegmentsNotification() {
-        disableReadAheadLogSegmentsNotification = true;
-        bkLedgerManager.disableReadAheadLogSegmentsNotification();
+        readHandler.disableReadAheadLogSegmentsNotification();
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index cd3f359..4963787 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -34,6 +34,10 @@ import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.function.CloseAsyncCloseableFunction;
 import com.twitter.distributedlog.function.GetVersionedValueFunction;
+import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
+import com.twitter.distributedlog.injector.AsyncRandomFailureInjector;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
 import com.twitter.distributedlog.metadata.LogMetadataForReader;
 import com.twitter.distributedlog.metadata.LogMetadataForWriter;
 import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
@@ -141,7 +145,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
     private final FeatureProvider featureProvider;
     private final StatsLogger statsLogger;
     private final StatsLogger perLogStatsLogger;
-    private final AlertStatsLogger alertStatsLogger;
+    final AlertStatsLogger alertStatsLogger;
 
     // log stream metadata stores
     private final LogStreamMetadataStore writerMetadataStore;
@@ -159,6 +163,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
     private final boolean ownWriterBKC;
     private final BookKeeperClientBuilder readerBKCBuilder;
     private final BookKeeperClient readerBKC;
+    private LogSegmentEntryStore readerEntryStore = null;
     private final boolean ownReaderBKC;
 
     //
@@ -176,6 +181,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
     private final PendingReaders pendingReaders;
     private final ReadAheadExceptionsLogger readAheadExceptionsLogger;
 
+    // Failure Injector
+    private final AsyncFailureInjector failureInjector;
+
     /**
      * Create a DLM for testing.
      *
@@ -303,6 +311,16 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
         this.ledgerAllocator = ledgerAllocator;
         this.writeLimiter = writeLimiter;
 
+        // Failure Injection
+        this.failureInjector = AsyncRandomFailureInjector.newBuilder()
+                .injectDelays(conf.getEIInjectReadAheadDelay(),
+                              conf.getEIInjectReadAheadDelayPercent(),
+                              conf.getEIInjectMaxReadAheadDelayMs())
+                .injectErrors(false, 10)
+                .injectStops(conf.getEIInjectReadAheadStall(), 10)
+                .injectCorruption(conf.getEIInjectReadAheadBrokenEntries())
+                .build();
+
         if (null == writerMetadataStore) {
             this.writerMetadataStore = new ZKLogStreamMetadataStore(
                     clientId,
@@ -413,6 +431,18 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
         return this.readerBKC;
     }
 
+    synchronized LogSegmentEntryStore getReaderEntryStore() throws IOException {
+        if (null == readerEntryStore) {
+            readerEntryStore = new BKLogSegmentEntryStore(
+                conf,
+                readerBKC.get(),
+                scheduler,
+                statsLogger,
+                failureInjector);
+        }
+        return this.readerEntryStore;
+    }
+
     @VisibleForTesting
     FuturePool getReaderFuturePool() {
         return this.readerFuturePool;
@@ -423,6 +453,10 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
         return this.featureProvider;
     }
 
+    AsyncFailureInjector getFailureInjector() {
+        return this.failureInjector;
+    }
+
     private synchronized BKLogReadHandler getReadHandlerAndRegisterListener(
             boolean create, LogSegmentListener listener) {
         if (null == readHandlerForListener && create) {
@@ -432,7 +466,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
             readHandlerForListener.asyncStartFetchLogSegments();
             return readHandlerForListener;
         }
-        readHandlerForListener.registerListener(listener);
+        if (null != readHandlerForListener && null != listener) {
+            readHandlerForListener.registerListener(listener);
+        }
         return readHandlerForListener;
     }
 
@@ -493,13 +529,11 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
         return createReadHandler(
                 subscriberId,
                 null,
-                true, /* deserialize record set */
                 isHandleForReading);
     }
 
     synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId,
                                                     AsyncNotification notification,
-                                                    boolean deserializeRecordSet,
                                                     boolean isHandleForReading) {
         LogMetadataForReader logMetadata = LogMetadataForReader.of(uri, name, streamIdentifier);
         return new BKLogReadHandler(
@@ -511,15 +545,12 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
                 readerMetadataStore,
                 logSegmentMetadataCache,
                 scheduler,
-                readAheadScheduler,
                 alertStatsLogger,
-                readAheadExceptionsLogger,
                 statsLogger,
                 perLogStatsLogger,
                 clientId,
                 notification,
-                isHandleForReading,
-                deserializeRecordSet);
+                isHandleForReading);
     }
 
     // Create Ledger Allocator
@@ -930,7 +961,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
                 fromDLSN,
                 subscriberId,
                 false,
-                dynConf.getDeserializeRecordSetOnReads(),
                 statsLogger);
         pendingReaders.add(reader);
         return Future.value(reader);
@@ -969,7 +999,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
                 fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN,
                 subscriberId,
                 false,
-                dynConf.getDeserializeRecordSetOnReads(),
                 statsLogger);
         pendingReaders.add(reader);
         final Future<Void> lockFuture = reader.lockStream();
@@ -1046,15 +1075,14 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
 
     LogReader getInputStreamInternal(DLSN fromDLSN, Optional<Long> fromTxnId)
             throws IOException {
-        LOG.info("Create async reader starting from {}", fromDLSN);
+        LOG.info("Create sync reader starting from {}", fromDLSN);
         checkClosedOrInError("getInputStream");
-        LogReader reader = new BKSyncLogReaderDLSN(
+        return new BKSyncLogReaderDLSN(
                 conf,
                 this,
                 fromDLSN,
                 fromTxnId,
                 statsLogger);
-        return reader;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
index 9cfe1a6..67c584c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.base.Ticker;
 import com.twitter.distributedlog.callback.LogSegmentListener;
 import com.twitter.distributedlog.callback.LogSegmentNamesListener;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
@@ -36,14 +35,10 @@ import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.metadata.LogMetadataForReader;
-import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.lock.DistributedLock;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
 import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
-import com.twitter.distributedlog.readahead.ReadAheadWorker;
-import com.twitter.distributedlog.stats.BroadCastStatsLogger;
-import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.Utils;
@@ -111,13 +106,9 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
     static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
 
     protected final LogMetadataForReader logMetadataForReader;
-    protected final ReadAheadCache readAheadCache;
     protected final LedgerHandleCache handleCache;
 
-    protected final OrderedScheduler readAheadExecutor;
     protected final DynamicDistributedLogConfiguration dynConf;
-    protected ReadAheadWorker readAheadWorker = null;
-    private final boolean isHandleForReading;
 
     private final Optional<String> subscriberId;
     private DistributedLock readLock;
@@ -134,10 +125,7 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
             new Versioned<List<LogSegmentMetadata>>(null, Version.NEW);
 
     // stats
-    private final AlertStatsLogger alertStatsLogger;
-    private final StatsLogger handlerStatsLogger;
     private final StatsLogger perLogStatsLogger;
-    private final ReadAheadExceptionsLogger readAheadExceptionsLogger;
 
     /**
      * Construct a Bookkeeper journal manager.
@@ -150,15 +138,12 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
                      LogStreamMetadataStore streamMetadataStore,
                      LogSegmentMetadataCache metadataCache,
                      OrderedScheduler scheduler,
-                     OrderedScheduler readAheadExecutor,
                      AlertStatsLogger alertStatsLogger,
-                     ReadAheadExceptionsLogger readAheadExceptionsLogger,
                      StatsLogger statsLogger,
                      StatsLogger perLogStatsLogger,
                      String clientId,
                      AsyncNotification readerStateNotification,
-                     boolean isHandleForReading,
-                     boolean deserializeRecordSet) {
+                     boolean isHandleForReading) {
         super(logMetadata,
                 conf,
                 bkcBuilder,
@@ -170,13 +155,8 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
                 clientId);
         this.logMetadataForReader = logMetadata;
         this.dynConf = dynConf;
-        this.readAheadExecutor = readAheadExecutor;
-        this.alertStatsLogger = alertStatsLogger;
         this.perLogStatsLogger =
                 isHandleForReading ? perLogStatsLogger : NullStatsLogger.INSTANCE;
-        this.handlerStatsLogger =
-                BroadCastStatsLogger.masterslave(this.perLogStatsLogger, statsLogger);
-        this.readAheadExceptionsLogger = readAheadExceptionsLogger;
         this.readerStateNotification = readerStateNotification;
 
         handleCache = LedgerHandleCache.newBuilder()
@@ -184,16 +164,7 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
                 .conf(conf)
                 .statsLogger(statsLogger)
                 .build();
-        readAheadCache = new ReadAheadCache(
-                getFullyQualifiedName(),
-                alertStatsLogger,
-                readerStateNotification,
-                dynConf.getReadAheadMaxRecords(),
-                deserializeRecordSet,
-                Ticker.systemTicker());
-
         this.subscriberId = subscriberId;
-        this.isHandleForReading = isHandleForReading;
     }
 
     @VisibleForTesting
@@ -290,16 +261,10 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
             }
             lockToClose = readLock;
         }
-        return Utils.closeSequence(scheduler, readAheadWorker, lockToClose)
+        return Utils.closeSequence(scheduler, lockToClose)
                 .flatMap(new AbstractFunction1<Void, Future<Void>>() {
             @Override
             public Future<Void> apply(Void result) {
-                if (null != readAheadCache) {
-                    readAheadCache.clear();
-                }
-                if (null != readAheadWorker) {
-                    unregisterListener(readAheadWorker);
-                }
                 if (null != handleCache) {
                     handleCache.clear();
                 }
@@ -361,57 +326,6 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
         });
     }
 
-    public void startReadAhead(LedgerReadPosition startPosition,
-                               AsyncFailureInjector failureInjector) {
-        if (null == readAheadWorker) {
-            readAheadWorker = new ReadAheadWorker(
-                    conf,
-                    dynConf,
-                    logMetadataForReader,
-                    this,
-                    readAheadExecutor,
-                    handleCache,
-                    startPosition,
-                    readAheadCache,
-                    isHandleForReading,
-                    readAheadExceptionsLogger,
-                    handlerStatsLogger,
-                    perLogStatsLogger,
-                    alertStatsLogger,
-                    failureInjector,
-                    readerStateNotification);
-            registerListener(readAheadWorker);
-            // start the readahead worker after the log segments are fetched
-            asyncStartFetchLogSegments().map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
-                @Override
-                public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
-                    readAheadWorker.start(logSegments.getValue());
-                    return BoxedUnit.UNIT;
-                }
-            });
-        }
-    }
-
-    public boolean isReadAheadCaughtUp() {
-        return null != readAheadWorker && readAheadWorker.isCaughtUp();
-    }
-
-    public LedgerHandleCache getHandleCache() {
-        return handleCache;
-    }
-
-    public Entry.Reader getNextReadAheadEntry() throws IOException {
-        return readAheadCache.getNextReadAheadEntry();
-    }
-
-    public Entry.Reader getNextReadAheadEntry(long waitTime, TimeUnit waitTimeUnit) throws IOException {
-        return readAheadCache.getNextReadAheadEntry(waitTime, waitTimeUnit);
-    }
-
-    public ReadAheadCache getReadAheadCache() {
-        return readAheadCache;
-    }
-
     @VisibleForTesting
     void disableReadAheadLogSegmentsNotification() {
         logSegmentsNotificationDisabled = true;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
index 0f6db75..adf49a1 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
@@ -19,14 +19,18 @@ package com.twitter.distributedlog;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.base.Ticker;
 import com.twitter.distributedlog.exceptions.EndOfStreamException;
 import com.twitter.distributedlog.exceptions.IdleReaderException;
-import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.Future;
 import com.twitter.util.Promise;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
 
 import java.io.IOException;
 import java.util.LinkedList;
@@ -39,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
 
+    private final BKDistributedLogManager bkdlm;
     private final BKLogReadHandler readHandler;
     private final AtomicReference<IOException> readerException =
             new AtomicReference<IOException>(null);
@@ -48,6 +53,9 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
     private boolean positioned = false;
     private Entry.Reader currentEntry = null;
 
+    // readahead reader
+    ReadAheadEntryReader readAheadReader = null;
+
     // idle reader settings
     private final boolean shouldCheckIdleReader;
     private final int idleErrorThresholdMillis;
@@ -59,19 +67,19 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
                         BKDistributedLogManager bkdlm,
                         DLSN startDLSN,
                         Optional<Long> startTransactionId,
-                        StatsLogger statsLogger) {
+                        StatsLogger statsLogger) throws IOException {
+        this.bkdlm = bkdlm;
         this.readHandler = bkdlm.createReadHandler(
                 Optional.<String>absent(),
                 this,
-                conf.getDeserializeRecordSetOnReads(),
                 true);
         this.maxReadAheadWaitTime = conf.getReadAheadWaitTime();
         this.idleErrorThresholdMillis = conf.getReaderIdleErrorThresholdMillis();
         this.shouldCheckIdleReader = idleErrorThresholdMillis > 0 && idleErrorThresholdMillis < Integer.MAX_VALUE;
         this.startTransactionId = startTransactionId;
-        readHandler.startReadAhead(
-                new LedgerReadPosition(startDLSN),
-                AsyncFailureInjector.NULL);
+
+        // start readahead
+        startReadAhead(startDLSN);
         if (!startTransactionId.isPresent()) {
             positioned = true;
         }
@@ -81,32 +89,55 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
         idleReaderError = syncReaderStatsLogger.getCounter("idle_reader_error");
     }
 
+    private void startReadAhead(DLSN startDLSN) throws IOException {
+        readAheadReader = new ReadAheadEntryReader(
+                    bkdlm.getStreamName(),
+                    startDLSN,
+                    bkdlm.getConf(),
+                    readHandler,
+                    bkdlm.getReaderEntryStore(),
+                    bkdlm.getScheduler(),
+                    Ticker.systemTicker(),
+                    bkdlm.alertStatsLogger);
+        readHandler.registerListener(readAheadReader);
+        readHandler.asyncStartFetchLogSegments()
+                .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
+                    @Override
+                    public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
+                        readAheadReader.addStateChangeNotification(BKSyncLogReaderDLSN.this);
+                        readAheadReader.start(logSegments.getValue());
+                        return BoxedUnit.UNIT;
+                    }
+                });
+    }
+
+    @VisibleForTesting
+    ReadAheadEntryReader getReadAheadReader() {
+        return readAheadReader;
+    }
+
     @VisibleForTesting
     BKLogReadHandler getReadHandler() {
         return readHandler;
     }
 
-    // reader is still catching up, waiting for next record
-
     private Entry.Reader readNextEntry(boolean nonBlocking) throws IOException {
         Entry.Reader entry = null;
         if (nonBlocking) {
-            return readHandler.getNextReadAheadEntry();
+            return readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
         } else {
-            while (!readHandler.isReadAheadCaughtUp()
+            while (!readAheadReader.isReadAheadCaughtUp()
                     && null == readerException.get()
                     && null == entry) {
-                entry = readHandler.getNextReadAheadEntry(maxReadAheadWaitTime,
-                        TimeUnit.MILLISECONDS);
+                entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
             }
             if (null != entry) {
                 return entry;
             }
             // reader is caught up
-            if (readHandler.isReadAheadCaughtUp()
+            if (readAheadReader.isReadAheadCaughtUp()
                     && null == readerException.get()) {
-                entry = readHandler.getNextReadAheadEntry(maxReadAheadWaitTime,
-                        TimeUnit.MILLISECONDS);
+                entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
             }
             return entry;
         }
@@ -121,30 +152,24 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
         throw ire;
     }
 
-
     @Override
     public synchronized LogRecordWithDLSN readNext(boolean nonBlocking)
             throws IOException {
         if (null != readerException.get()) {
             throw readerException.get();
         }
-
         LogRecordWithDLSN record = doReadNext(nonBlocking);
-
         // no record is returned, check if the reader becomes idle
         if (null == record && shouldCheckIdleReader) {
-            ReadAheadCache cache = readHandler.getReadAheadCache();
-            if (cache.getNumCachedEntries() <= 0 &&
-                    cache.isReadAheadIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) {
+            if (readAheadReader.getNumCachedEntries() <= 0 &&
+                    readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) {
                 markReaderAsIdle();
             }
         }
-
         return record;
     }
 
-    private synchronized LogRecordWithDLSN doReadNext(boolean nonBlocking)
-            throws IOException {
+    private LogRecordWithDLSN doReadNext(boolean nonBlocking) throws IOException {
         LogRecordWithDLSN record = null;
 
         do {
@@ -217,7 +242,12 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
             }
             closeFuture = closePromise = new Promise<Void>();
         }
-        readHandler.asyncClose().proxyTo(closePromise);
+        readHandler.unregisterListener(readAheadReader);
+        readAheadReader.removeStateChangeNotification(this);
+        Utils.closeSequence(bkdlm.getScheduler(), true,
+                readAheadReader,
+                readHandler
+        ).proxyTo(closePromise);
         return closePromise;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java
index b1bd701..bf315fc 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java
@@ -358,6 +358,20 @@ public class Entry {
     public interface Reader {
 
         /**
+         * Get the log segment sequence number.
+         *
+         * @return the log segment sequence number.
+         */
+        long getLSSN();
+
+        /**
+         * Return the entry id.
+         *
+         * @return the entry id.
+         */
+        long getEntryId();
+
+        /**
          * Read next log record from this record set.
          *
          * @return next log record from this record set.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java
new file mode 100644
index 0000000..0a15d29
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog;
+
+/**
+ * The position of an entry, identified by log segment sequence number and entry id.
+ */
+class EntryPosition {
+
+    private long lssn;
+    private long entryId;
+
+    EntryPosition(long lssn, long entryId) {
+        this.lssn = lssn;
+        this.entryId = entryId;
+    }
+
+    public synchronized long getLogSegmentSequenceNumber() {
+        return lssn;
+    }
+
+    public synchronized long getEntryId() {
+        return entryId;
+    }
+
+    public synchronized boolean advance(long lssn, long entryId) {
+        if (lssn == this.lssn) {
+            if (entryId <= this.entryId) {
+                return false;
+            }
+            this.entryId = entryId;
+            return true;
+        } else if (lssn > this.lssn) {
+            this.lssn = lssn;
+            this.entryId = entryId;
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("(").append(lssn).append(", ").append(entryId).append(")");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java
index 79e4408..038bb18 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java
@@ -57,6 +57,16 @@ class EnvelopedEntryReader implements Entry.Reader, RecordStream {
     }
 
     @Override
+    public long getLSSN() {
+        return logSegmentSeqNo;
+    }
+
+    @Override
+    public long getEntryId() {
+        return entryId;
+    }
+
+    @Override
     public LogRecordWithDLSN nextRecord() throws IOException {
         return reader.readOp();
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
index ed7218e..c3948df 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
@@ -66,7 +66,7 @@ class MaxTxId {
     }
 
     public Versioned<Long> getVersionedData(long txId) {
-        return new Versioned<Long>(Math.max(txId, currentMax), version);
+        return new Versioned<Long>(Math.max(txId, get()), version);
     }
 
 }