You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:31 UTC

[17/31] incubator-distributedlog git commit: DL-160: Remove 'DLSN' suffix from async and sync readers

DL-160: Remove 'DLSN' suffix from async and sync readers


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/c7751804
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/c7751804
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/c7751804

Branch: refs/heads/master
Commit: c7751804ef728c39d54b19e52f2db48d7fba9f65
Parents: 7a97797
Author: Sijie Guo <si...@twitter.com>
Authored: Wed Dec 28 16:27:38 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Thu Dec 29 02:12:04 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKAsyncLogReader.java        | 748 +++++++++++++++++++
 .../distributedlog/BKAsyncLogReaderDLSN.java    | 748 -------------------
 .../distributedlog/BKDistributedLogManager.java |   8 +-
 .../twitter/distributedlog/BKSyncLogReader.java | 276 +++++++
 .../distributedlog/BKSyncLogReaderDLSN.java     | 276 -------
 .../src/main/resources/findbugsExclude.xml      |   2 +-
 .../NonBlockingReadsTestUtil.java               |   6 +-
 .../distributedlog/TestAsyncReaderLock.java     |   6 +-
 .../distributedlog/TestAsyncReaderWriter.java   |  14 +-
 .../distributedlog/TestBKSyncLogReader.java     |   6 +-
 .../TestNonBlockingReadsMultiReader.java        |   2 +-
 .../com/twitter/distributedlog/TestReader.java  |   2 +-
 .../distributedlog/TestRollLogSegments.java     |   4 +-
 13 files changed, 1049 insertions(+), 1049 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
new file mode 100644
index 0000000..18d2e15
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
@@ -0,0 +1,748 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.twitter.distributedlog.exceptions.DLIllegalStateException;
+import com.twitter.distributedlog.exceptions.DLInterruptedException;
+import com.twitter.distributedlog.exceptions.EndOfStreamException;
+import com.twitter.distributedlog.exceptions.IdleReaderException;
+import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.exceptions.ReadCancelledException;
+import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.util.Utils;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import com.twitter.util.Throw;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Function1;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+/**
+ * BookKeeper based {@link AsyncLogReader} implementation.
+ *
+ * <h3>Metrics</h3>
+ * All the metrics are exposed under `async_reader`.
+ * <ul>
+ * <li> `async_reader`/future_set: opstats. time spent on satisfying futures of read requests.
+ * if it is high, it means that the caller takes time on processing the result of read requests.
+ * The side effect is blocking consequent reads.
+ * <li> `async_reader`/schedule: opstats. time spent on scheduling next reads.
+ * <li> `async_reader`/background_read: opstats. time spent on background reads.
+ * <li> `async_reader`/read_next_exec: opstats. time spent on executing {@link #readNext()}.
+ * <li> `async_reader`/time_between_read_next: opstats. time spent on between two consequent {@link #readNext()}.
+ * if it is high, it means that the caller is slowing down on calling {@link #readNext()}.
+ * <li> `async_reader`/delay_until_promise_satisfied: opstats. total latency for the read requests.
+ * <li> `async_reader`/idle_reader_error: counter. the number idle reader errors.
+ * </ul>
+ */
+class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification {
+    static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReader.class);
+
+    private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION =
+            new AbstractFunction1<List<LogRecordWithDLSN>, LogRecordWithDLSN>() {
+                @Override
+                public LogRecordWithDLSN apply(List<LogRecordWithDLSN> records) {
+                    return records.get(0);
+                }
+            };
+
+    protected final BKDistributedLogManager bkDistributedLogManager;
+    protected final BKLogReadHandler readHandler;
+    private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
+    private final ScheduledExecutorService executorService;
+    private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>();
+    private final Object scheduleLock = new Object();
+    private final AtomicLong scheduleCount = new AtomicLong(0);
+    final private Stopwatch scheduleDelayStopwatch;
+    final private Stopwatch readNextDelayStopwatch;
+    private DLSN startDLSN;
+    private ReadAheadEntryReader readAheadReader = null;
+    private int lastPosition = 0;
+    private final boolean positionGapDetectionEnabled;
+    private final int idleErrorThresholdMillis;
+    final ScheduledFuture<?> idleReaderTimeoutTask;
+    private ScheduledFuture<?> backgroundScheduleTask = null;
+    // last process time
+    private final Stopwatch lastProcessTime;
+
+    protected Promise<Void> closeFuture = null;
+
+    private boolean lockStream = false;
+
+    private final boolean returnEndOfStreamRecord;
+
+    private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() {
+        @Override
+        public void run() {
+            synchronized (scheduleLock) {
+                backgroundScheduleTask = null;
+            }
+            scheduleBackgroundRead();
+        }
+    };
+
+    // State
+    private Entry.Reader currentEntry = null;
+    private LogRecordWithDLSN nextRecord = null;
+
+    // Failure Injector
+    private boolean disableProcessingReadRequests = false;
+
+    // Stats
+    private final OpStatsLogger readNextExecTime;
+    private final OpStatsLogger delayUntilPromiseSatisfied;
+    private final OpStatsLogger timeBetweenReadNexts;
+    private final OpStatsLogger futureSetLatency;
+    private final OpStatsLogger scheduleLatency;
+    private final OpStatsLogger backgroundReaderRunTime;
+    private final Counter idleReaderCheckCount;
+    private final Counter idleReaderCheckIdleReadRequestCount;
+    private final Counter idleReaderCheckIdleReadAheadCount;
+    private final Counter idleReaderError;
+
+    private class PendingReadRequest {
+        private final Stopwatch enqueueTime;
+        private final int numEntries;
+        private final List<LogRecordWithDLSN> records;
+        private final Promise<List<LogRecordWithDLSN>> promise;
+        private final long deadlineTime;
+        private final TimeUnit deadlineTimeUnit;
+
+        PendingReadRequest(int numEntries,
+                           long deadlineTime,
+                           TimeUnit deadlineTimeUnit) {
+            this.numEntries = numEntries;
+            this.enqueueTime = Stopwatch.createStarted();
+            // optimize the space usage for single read.
+            if (numEntries == 1) {
+                this.records = new ArrayList<LogRecordWithDLSN>(1);
+            } else {
+                this.records = new ArrayList<LogRecordWithDLSN>();
+            }
+            this.promise = new Promise<List<LogRecordWithDLSN>>();
+            this.deadlineTime = deadlineTime;
+            this.deadlineTimeUnit = deadlineTimeUnit;
+        }
+
+        Promise<List<LogRecordWithDLSN>> getPromise() {
+            return promise;
+        }
+
+        long elapsedSinceEnqueue(TimeUnit timeUnit) {
+            return enqueueTime.elapsed(timeUnit);
+        }
+
+        void setException(Throwable throwable) {
+            Stopwatch stopwatch = Stopwatch.createStarted();
+            if (promise.updateIfEmpty(new Throw<List<LogRecordWithDLSN>>(throwable))) {
+                futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+                delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS));
+            }
+        }
+
+        boolean hasReadRecords() {
+            return records.size() > 0;
+        }
+
+        boolean hasReadEnoughRecords() {
+            return records.size() >= numEntries;
+        }
+
+        long getRemainingWaitTime() {
+            if (deadlineTime <= 0L) {
+                return 0L;
+            }
+            return deadlineTime - elapsedSinceEnqueue(deadlineTimeUnit);
+        }
+
+        void addRecord(LogRecordWithDLSN record) {
+            records.add(record);
+        }
+
+        void complete() {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("{} : Satisfied promise with {} records", readHandler.getFullyQualifiedName(), records.size());
+            }
+            delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS));
+            Stopwatch stopwatch = Stopwatch.createStarted();
+            promise.setValue(records);
+            futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+        }
+    }
+
+    BKAsyncLogReader(BKDistributedLogManager bkdlm,
+                     ScheduledExecutorService executorService,
+                     DLSN startDLSN,
+                     Optional<String> subscriberId,
+                     boolean returnEndOfStreamRecord,
+                     StatsLogger statsLogger) {
+        this.bkDistributedLogManager = bkdlm;
+        this.executorService = executorService;
+        this.readHandler = bkDistributedLogManager.createReadHandler(subscriberId,
+                this, true);
+        LOG.debug("Starting async reader at {}", startDLSN);
+        this.startDLSN = startDLSN;
+        this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
+        this.readNextDelayStopwatch = Stopwatch.createStarted();
+        this.positionGapDetectionEnabled = bkdlm.getConf().getPositionGapDetectionEnabled();
+        this.idleErrorThresholdMillis = bkdlm.getConf().getReaderIdleErrorThresholdMillis();
+        this.returnEndOfStreamRecord = returnEndOfStreamRecord;
+
+        // Stats
+        StatsLogger asyncReaderStatsLogger = statsLogger.scope("async_reader");
+        futureSetLatency = asyncReaderStatsLogger.getOpStatsLogger("future_set");
+        scheduleLatency = asyncReaderStatsLogger.getOpStatsLogger("schedule");
+        backgroundReaderRunTime = asyncReaderStatsLogger.getOpStatsLogger("background_read");
+        readNextExecTime = asyncReaderStatsLogger.getOpStatsLogger("read_next_exec");
+        timeBetweenReadNexts = asyncReaderStatsLogger.getOpStatsLogger("time_between_read_next");
+        delayUntilPromiseSatisfied = asyncReaderStatsLogger.getOpStatsLogger("delay_until_promise_satisfied");
+        idleReaderError = asyncReaderStatsLogger.getCounter("idle_reader_error");
+        idleReaderCheckCount = asyncReaderStatsLogger.getCounter("idle_reader_check_total");
+        idleReaderCheckIdleReadRequestCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_read_requests");
+        idleReaderCheckIdleReadAheadCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_readahead");
+
+        // Lock the stream if requested. The lock will be released when the reader is closed.
+        this.lockStream = false;
+        this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary();
+        this.lastProcessTime = Stopwatch.createStarted();
+    }
+
+    private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
+        if (idleErrorThresholdMillis < Integer.MAX_VALUE) {
+            // Dont run the task more than once every seconds (for sanity)
+            long period = Math.max(idleErrorThresholdMillis / 10, 1000);
+            // Except when idle reader threshold is less than a second (tests?)
+            period = Math.min(period, idleErrorThresholdMillis / 5);
+
+            return executorService.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    PendingReadRequest nextRequest = pendingRequests.peek();
+
+                    idleReaderCheckCount.inc();
+                    if (null == nextRequest) {
+                        return;
+                    }
+
+                    idleReaderCheckIdleReadRequestCount.inc();
+                    if (nextRequest.elapsedSinceEnqueue(TimeUnit.MILLISECONDS) < idleErrorThresholdMillis) {
+                        return;
+                    }
+
+                    ReadAheadEntryReader readAheadReader = getReadAheadReader();
+
+                    // read request has been idle
+                    //   - cache has records but read request are idle,
+                    //     that means notification was missed between readahead and reader.
+                    //   - cache is empty and readahead is idle (no records added for a long time)
+                    idleReaderCheckIdleReadAheadCount.inc();
+                    try {
+                        if (null == readAheadReader || (!hasMoreRecords() &&
+                                readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS))) {
+                            markReaderAsIdle();
+                            return;
+                        } else if (lastProcessTime.elapsed(TimeUnit.MILLISECONDS) > idleErrorThresholdMillis) {
+                            markReaderAsIdle();;
+                        }
+                    } catch (IOException e) {
+                        setLastException(e);
+                        return;
+                    }
+                }
+            }, period, period, TimeUnit.MILLISECONDS);
+        }
+        return null;
+    }
+
+    synchronized ReadAheadEntryReader getReadAheadReader() {
+        return readAheadReader;
+    }
+
+    void cancelIdleReaderTask() {
+        // Do this after we have checked that the reader was not previously closed
+        try {
+            if (null != idleReaderTimeoutTask) {
+                idleReaderTimeoutTask.cancel(true);
+            }
+        } catch (Exception exc) {
+            LOG.info("{}: Failed to cancel the background idle reader timeout task", readHandler.getFullyQualifiedName());
+        }
+    }
+
+    private void markReaderAsIdle() {
+        idleReaderError.inc();
+        IdleReaderException ire = new IdleReaderException("Reader on stream "
+                + readHandler.getFullyQualifiedName()
+                + " is idle for " + idleErrorThresholdMillis +" ms");
+        setLastException(ire);
+        // cancel all pending reads directly rather than notifying on error
+        // because idle reader could happen on idle read requests that usually means something wrong
+        // in scheduling reads
+        cancelAllPendingReads(ire);
+    }
+
+    protected synchronized void setStartDLSN(DLSN fromDLSN) throws UnexpectedException {
+        if (null != readAheadReader) {
+            throw new UnexpectedException("Could't reset from dlsn after reader already starts reading.");
+        }
+        startDLSN = fromDLSN;
+    }
+
+    @VisibleForTesting
+    public synchronized DLSN getStartDLSN() {
+        return startDLSN;
+    }
+
+    public Future<Void> lockStream() {
+        this.lockStream = true;
+        return readHandler.lockStream();
+    }
+
+    private boolean checkClosedOrInError(String operation) {
+        if (null == lastException.get()) {
+            try {
+                if (null != readHandler && null != getReadAheadReader()) {
+                    getReadAheadReader().checkLastException();
+                }
+
+                bkDistributedLogManager.checkClosedOrInError(operation);
+            } catch (IOException exc) {
+                setLastException(exc);
+            }
+        }
+
+        if (lockStream) {
+            try {
+                readHandler.checkReadLock();
+            } catch (IOException ex) {
+                setLastException(ex);
+            }
+        }
+
+        if (null != lastException.get()) {
+            LOG.trace("Cancelling pending reads");
+            cancelAllPendingReads(lastException.get());
+            return true;
+        }
+
+        return false;
+    }
+
+    private void setLastException(IOException exc) {
+        lastException.compareAndSet(null, exc);
+    }
+
+    @Override
+    public String getStreamName() {
+        return bkDistributedLogManager.getStreamName();
+    }
+
+    /**
+     * @return A promise that when satisfied will contain the Log Record with its DLSN.
+     */
+    @Override
+    public synchronized Future<LogRecordWithDLSN> readNext() {
+        return readInternal(1, 0, TimeUnit.MILLISECONDS).map(READ_NEXT_MAP_FUNCTION);
+    }
+
+    public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries) {
+        return readInternal(numEntries, 0, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries,
+                                                                 long waitTime,
+                                                                 TimeUnit timeUnit) {
+        return readInternal(numEntries, waitTime, timeUnit);
+    }
+
+    /**
+     * Read up to <i>numEntries</i> entries. The future will be satisfied when any number of entries are
+     * ready (1 to <i>numEntries</i>).
+     *
+     * @param numEntries
+     *          num entries to read
+     * @return A promise that satisfied with a non-empty list of log records with their DLSN.
+     */
+    private synchronized Future<List<LogRecordWithDLSN>> readInternal(int numEntries,
+                                                                      long deadlineTime,
+                                                                      TimeUnit deadlineTimeUnit) {
+        timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
+        readNextDelayStopwatch.reset().start();
+        final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit);
+
+        if (null == readAheadReader) {
+            try {
+                final ReadAheadEntryReader readAheadEntryReader = this.readAheadReader = new ReadAheadEntryReader(
+                        getStreamName(),
+                        getStartDLSN(),
+                        bkDistributedLogManager.getConf(),
+                        readHandler,
+                        bkDistributedLogManager.getReaderEntryStore(),
+                        bkDistributedLogManager.getScheduler(),
+                        Ticker.systemTicker(),
+                        bkDistributedLogManager.alertStatsLogger);
+                readHandler.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() {
+                    @Override
+                    public void onSuccess(Void value) {
+                        try {
+                            readHandler.registerListener(readAheadEntryReader);
+                            readHandler.asyncStartFetchLogSegments()
+                                    .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
+                                        @Override
+                                        public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
+                                            readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this);
+                                            readAheadEntryReader.start(logSegments.getValue());
+                                            return BoxedUnit.UNIT;
+                                        }
+                                    });
+                        } catch (Exception exc) {
+                            notifyOnError(exc);
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        notifyOnError(cause);
+                    }
+                });
+            } catch (IOException ioe) {
+                notifyOnError(ioe);
+            }
+        }
+
+        if (checkClosedOrInError("readNext")) {
+            readRequest.setException(lastException.get());
+        } else {
+            boolean queueEmpty = pendingRequests.isEmpty();
+            pendingRequests.add(readRequest);
+
+            if (queueEmpty) {
+                scheduleBackgroundRead();
+            }
+        }
+
+        readNextExecTime.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
+        readNextDelayStopwatch.reset().start();
+
+        return readRequest.getPromise();
+    }
+
+    public synchronized void scheduleBackgroundRead() {
+        // if the reader is already closed, we don't need to schedule background read again.
+        if (null != closeFuture) {
+            return;
+        }
+
+        long prevCount = scheduleCount.getAndIncrement();
+        if (0 == prevCount) {
+            scheduleDelayStopwatch.reset().start();
+            executorService.submit(this);
+        }
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        // Cancel the idle reader timeout task, interrupting if necessary
+        ReadCancelledException exception;
+        Promise<Void> closePromise;
+        synchronized (this) {
+            if (null != closeFuture) {
+                return closeFuture;
+            }
+            closePromise = closeFuture = new Promise<Void>();
+            exception = new ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was closed");
+            setLastException(exception);
+        }
+
+        // Do this after we have checked that the reader was not previously closed
+        cancelIdleReaderTask();
+
+        synchronized (scheduleLock) {
+            if (null != backgroundScheduleTask) {
+                backgroundScheduleTask.cancel(true);
+            }
+        }
+
+        cancelAllPendingReads(exception);
+
+        ReadAheadEntryReader readAheadReader = getReadAheadReader();
+        if (null != readAheadReader) {
+            readHandler.unregisterListener(readAheadReader);
+            readAheadReader.removeStateChangeNotification(this);
+        }
+        Utils.closeSequence(bkDistributedLogManager.getScheduler(), true,
+                readAheadReader,
+                readHandler
+        ).proxyTo(closePromise);
+        return closePromise;
+    }
+
+    private void cancelAllPendingReads(Throwable throwExc) {
+        for (PendingReadRequest promise : pendingRequests) {
+            promise.setException(throwExc);
+        }
+        pendingRequests.clear();
+    }
+
+    synchronized boolean hasMoreRecords() throws IOException {
+        if (null == readAheadReader) {
+            return false;
+        }
+        if (readAheadReader.getNumCachedEntries() > 0 || null != nextRecord) {
+            return true;
+        } else if (null != currentEntry) {
+            nextRecord = currentEntry.nextRecord();
+            return null != nextRecord;
+        }
+        return false;
+    }
+
+    private synchronized LogRecordWithDLSN readNextRecord() throws IOException {
+        if (null == readAheadReader) {
+            return null;
+        }
+        if (null == currentEntry) {
+            currentEntry = readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
+            // no entry after reading from read ahead then return null
+            if (null == currentEntry) {
+                return null;
+            }
+        }
+
+        LogRecordWithDLSN recordToReturn;
+        if (null == nextRecord) {
+            nextRecord = currentEntry.nextRecord();
+            // no more records in current entry
+            if (null == nextRecord) {
+                currentEntry = null;
+                return readNextRecord();
+            }
+        }
+
+        // found a record to return and prefetch the next one
+        recordToReturn = nextRecord;
+        nextRecord = currentEntry.nextRecord();
+        return recordToReturn;
+    }
+
+    @Override
+    public void run() {
+        synchronized(scheduleLock) {
+            if (scheduleDelayStopwatch.isRunning()) {
+                scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+            }
+
+            Stopwatch runTime = Stopwatch.createStarted();
+            int iterations = 0;
+            long scheduleCountLocal = scheduleCount.get();
+            LOG.debug("{}: Scheduled Background Reader", readHandler.getFullyQualifiedName());
+            while(true) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("{}: Executing Iteration: {}", readHandler.getFullyQualifiedName(), iterations++);
+                }
+
+                PendingReadRequest nextRequest = null;
+                synchronized(this) {
+                    nextRequest = pendingRequests.peek();
+
+                    // Queue is empty, nothing to read, return
+                    if (null == nextRequest) {
+                        LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName());
+                        scheduleCount.set(0);
+                        backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+                        return;
+                    }
+
+                    if (disableProcessingReadRequests) {
+                        LOG.info("Reader of {} is forced to stop processing read requests", readHandler.getFullyQualifiedName());
+                        return;
+                    }
+                }
+                lastProcessTime.reset().start();
+
+                // If the oldest pending promise is interrupted then we must mark
+                // the reader in error and abort all pending reads since we dont
+                // know the last consumed read
+                if (null == lastException.get()) {
+                    if (nextRequest.getPromise().isInterrupted().isDefined()) {
+                        setLastException(new DLInterruptedException("Interrupted on reading " + readHandler.getFullyQualifiedName() + " : ",
+                                nextRequest.getPromise().isInterrupted().get()));
+                    }
+                }
+
+                if (checkClosedOrInError("readNext")) {
+                    if (!(lastException.get().getCause() instanceof LogNotFoundException)) {
+                        LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get());
+                    }
+                    backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+                    return;
+                }
+
+                try {
+                    // Fail 10% of the requests when asked to simulate errors
+                    if (bkDistributedLogManager.getFailureInjector().shouldInjectErrors()) {
+                        throw new IOException("Reader Simulated Exception");
+                    }
+                    LogRecordWithDLSN record;
+                    while (!nextRequest.hasReadEnoughRecords()) {
+                        // read single record
+                        do {
+                            record = readNextRecord();
+                        } while (null != record && (record.isControl() || (record.getDlsn().compareTo(getStartDLSN()) < 0)));
+                        if (null == record) {
+                            break;
+                        } else {
+                            if (record.isEndOfStream() && !returnEndOfStreamRecord) {
+                                setLastException(new EndOfStreamException("End of Stream Reached for "
+                                        + readHandler.getFullyQualifiedName()));
+                                break;
+                            }
+
+                            // gap detection
+                            if (recordPositionsContainsGap(record, lastPosition)) {
+                                bkDistributedLogManager.raiseAlert("Gap detected between records at record = {}", record);
+                                if (positionGapDetectionEnabled) {
+                                    throw new DLIllegalStateException("Gap detected between records at record = " + record);
+                                }
+                            }
+                            lastPosition = record.getLastPositionWithinLogSegment();
+
+                            nextRequest.addRecord(record);
+                        }
+                    };
+                } catch (IOException exc) {
+                    setLastException(exc);
+                    if (!(exc instanceof LogNotFoundException)) {
+                        LOG.warn("{} : read with skip Exception", readHandler.getFullyQualifiedName(), lastException.get());
+                    }
+                    continue;
+                }
+
+                if (nextRequest.hasReadRecords()) {
+                    long remainingWaitTime = nextRequest.getRemainingWaitTime();
+                    if (remainingWaitTime > 0 && !nextRequest.hasReadEnoughRecords()) {
+                        backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+                        scheduleDelayStopwatch.reset().start();
+                        scheduleCount.set(0);
+                        // the request could still wait for more records
+                        backgroundScheduleTask = executorService.schedule(BACKGROUND_READ_SCHEDULER, remainingWaitTime, nextRequest.deadlineTimeUnit);
+                        return;
+                    }
+
+                    PendingReadRequest request = pendingRequests.poll();
+                    if (null != request && nextRequest == request) {
+                        request.complete();
+                        if (null != backgroundScheduleTask) {
+                            backgroundScheduleTask.cancel(true);
+                            backgroundScheduleTask = null;
+                        }
+                    } else {
+                        DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at dlsn = "
+                                + nextRequest.records.get(0).getDlsn());
+                        nextRequest.setException(ise);
+                        if (null != request) {
+                            request.setException(ise);
+                        }
+                        // We should never get here as we should have exited the loop if
+                        // pendingRequests were empty
+                        bkDistributedLogManager.raiseAlert("Unexpected condition at dlsn = {}",
+                                nextRequest.records.get(0).getDlsn());
+                        setLastException(ise);
+                    }
+                } else {
+                    if (0 == scheduleCountLocal) {
+                        LOG.trace("Schedule count dropping to zero", lastException.get());
+                        backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+                        return;
+                    }
+                    scheduleCountLocal = scheduleCount.decrementAndGet();
+                }
+            }
+        }
+    }
+
+    private boolean recordPositionsContainsGap(LogRecordWithDLSN record, long lastPosition) {
+        final boolean firstLogRecord = (1 == record.getPositionWithinLogSegment());
+        final boolean endOfStreamRecord = record.isEndOfStream();
+        final boolean emptyLogSegment = (0 == lastPosition);
+        final boolean positionIncreasedByOne = (record.getPositionWithinLogSegment() == (lastPosition + 1));
+
+        return !firstLogRecord && !endOfStreamRecord && !emptyLogSegment &&
+               !positionIncreasedByOne;
+    }
+
+    /**
+     * Triggered when the background activity encounters an exception
+     */
+    @Override
+    public void notifyOnError(Throwable cause) {
+        if (cause instanceof IOException) {
+            setLastException((IOException) cause);
+        } else {
+            setLastException(new IOException(cause));
+        }
+        scheduleBackgroundRead();
+    }
+
+    /**
+     * Triggered when the background activity completes an operation
+     */
+    @Override
+    public void notifyOnOperationComplete() {
+        scheduleBackgroundRead();
+    }
+
+    @VisibleForTesting
+    void simulateErrors() {
+        bkDistributedLogManager.getFailureInjector().injectErrors(true);
+    }
+
+    @VisibleForTesting
+    synchronized void disableReadAheadLogSegmentsNotification() {
+        readHandler.disableReadAheadLogSegmentsNotification();
+    }
+
+    @VisibleForTesting
+    synchronized void disableProcessingReadRequests() {
+        disableProcessingReadRequests = true;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
deleted file mode 100644
index e347012..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ /dev/null
@@ -1,748 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Ticker;
-import com.twitter.distributedlog.exceptions.DLIllegalStateException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.EndOfStreamException;
-import com.twitter.distributedlog.exceptions.IdleReaderException;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.ReadCancelledException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Throw;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Function1;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-/**
- * BookKeeper based {@link AsyncLogReader} implementation.
- *
- * <h3>Metrics</h3>
- * All the metrics are exposed under `async_reader`.
- * <ul>
- * <li> `async_reader`/future_set: opstats. time spent on satisfying futures of read requests.
- * if it is high, it means that the caller takes time on processing the result of read requests.
- * The side effect is blocking consequent reads.
- * <li> `async_reader`/schedule: opstats. time spent on scheduling next reads.
- * <li> `async_reader`/background_read: opstats. time spent on background reads.
- * <li> `async_reader`/read_next_exec: opstats. time spent on executing {@link #readNext()}.
- * <li> `async_reader`/time_between_read_next: opstats. time spent on between two consequent {@link #readNext()}.
- * if it is high, it means that the caller is slowing down on calling {@link #readNext()}.
- * <li> `async_reader`/delay_until_promise_satisfied: opstats. total latency for the read requests.
- * <li> `async_reader`/idle_reader_error: counter. the number idle reader errors.
- * </ul>
- */
-class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotification {
-    static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReaderDLSN.class);
-
-    private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION =
-            new AbstractFunction1<List<LogRecordWithDLSN>, LogRecordWithDLSN>() {
-                @Override
-                public LogRecordWithDLSN apply(List<LogRecordWithDLSN> records) {
-                    return records.get(0);
-                }
-            };
-
-    protected final BKDistributedLogManager bkDistributedLogManager;
-    protected final BKLogReadHandler readHandler;
-    private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
-    private final ScheduledExecutorService executorService;
-    private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>();
-    private final Object scheduleLock = new Object();
-    private final AtomicLong scheduleCount = new AtomicLong(0);
-    final private Stopwatch scheduleDelayStopwatch;
-    final private Stopwatch readNextDelayStopwatch;
-    private DLSN startDLSN;
-    private ReadAheadEntryReader readAheadReader = null;
-    private int lastPosition = 0;
-    private final boolean positionGapDetectionEnabled;
-    private final int idleErrorThresholdMillis;
-    final ScheduledFuture<?> idleReaderTimeoutTask;
-    private ScheduledFuture<?> backgroundScheduleTask = null;
-    // last process time
-    private final Stopwatch lastProcessTime;
-
-    protected Promise<Void> closeFuture = null;
-
-    private boolean lockStream = false;
-
-    private final boolean returnEndOfStreamRecord;
-
-    private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() {
-        @Override
-        public void run() {
-            synchronized (scheduleLock) {
-                backgroundScheduleTask = null;
-            }
-            scheduleBackgroundRead();
-        }
-    };
-
-    // State
-    private Entry.Reader currentEntry = null;
-    private LogRecordWithDLSN nextRecord = null;
-
-    // Failure Injector
-    private boolean disableProcessingReadRequests = false;
-
-    // Stats
-    private final OpStatsLogger readNextExecTime;
-    private final OpStatsLogger delayUntilPromiseSatisfied;
-    private final OpStatsLogger timeBetweenReadNexts;
-    private final OpStatsLogger futureSetLatency;
-    private final OpStatsLogger scheduleLatency;
-    private final OpStatsLogger backgroundReaderRunTime;
-    private final Counter idleReaderCheckCount;
-    private final Counter idleReaderCheckIdleReadRequestCount;
-    private final Counter idleReaderCheckIdleReadAheadCount;
-    private final Counter idleReaderError;
-
-    private class PendingReadRequest {
-        private final Stopwatch enqueueTime;
-        private final int numEntries;
-        private final List<LogRecordWithDLSN> records;
-        private final Promise<List<LogRecordWithDLSN>> promise;
-        private final long deadlineTime;
-        private final TimeUnit deadlineTimeUnit;
-
-        PendingReadRequest(int numEntries,
-                           long deadlineTime,
-                           TimeUnit deadlineTimeUnit) {
-            this.numEntries = numEntries;
-            this.enqueueTime = Stopwatch.createStarted();
-            // optimize the space usage for single read.
-            if (numEntries == 1) {
-                this.records = new ArrayList<LogRecordWithDLSN>(1);
-            } else {
-                this.records = new ArrayList<LogRecordWithDLSN>();
-            }
-            this.promise = new Promise<List<LogRecordWithDLSN>>();
-            this.deadlineTime = deadlineTime;
-            this.deadlineTimeUnit = deadlineTimeUnit;
-        }
-
-        Promise<List<LogRecordWithDLSN>> getPromise() {
-            return promise;
-        }
-
-        long elapsedSinceEnqueue(TimeUnit timeUnit) {
-            return enqueueTime.elapsed(timeUnit);
-        }
-
-        void setException(Throwable throwable) {
-            Stopwatch stopwatch = Stopwatch.createStarted();
-            if (promise.updateIfEmpty(new Throw<List<LogRecordWithDLSN>>(throwable))) {
-                futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-                delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS));
-            }
-        }
-
-        boolean hasReadRecords() {
-            return records.size() > 0;
-        }
-
-        boolean hasReadEnoughRecords() {
-            return records.size() >= numEntries;
-        }
-
-        long getRemainingWaitTime() {
-            if (deadlineTime <= 0L) {
-                return 0L;
-            }
-            return deadlineTime - elapsedSinceEnqueue(deadlineTimeUnit);
-        }
-
-        void addRecord(LogRecordWithDLSN record) {
-            records.add(record);
-        }
-
-        void complete() {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("{} : Satisfied promise with {} records", readHandler.getFullyQualifiedName(), records.size());
-            }
-            delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS));
-            Stopwatch stopwatch = Stopwatch.createStarted();
-            promise.setValue(records);
-            futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-        }
-    }
-
-    BKAsyncLogReaderDLSN(BKDistributedLogManager bkdlm,
-                         ScheduledExecutorService executorService,
-                         DLSN startDLSN,
-                         Optional<String> subscriberId,
-                         boolean returnEndOfStreamRecord,
-                         StatsLogger statsLogger) {
-        this.bkDistributedLogManager = bkdlm;
-        this.executorService = executorService;
-        this.readHandler = bkDistributedLogManager.createReadHandler(subscriberId,
-                this, true);
-        LOG.debug("Starting async reader at {}", startDLSN);
-        this.startDLSN = startDLSN;
-        this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
-        this.readNextDelayStopwatch = Stopwatch.createStarted();
-        this.positionGapDetectionEnabled = bkdlm.getConf().getPositionGapDetectionEnabled();
-        this.idleErrorThresholdMillis = bkdlm.getConf().getReaderIdleErrorThresholdMillis();
-        this.returnEndOfStreamRecord = returnEndOfStreamRecord;
-
-        // Stats
-        StatsLogger asyncReaderStatsLogger = statsLogger.scope("async_reader");
-        futureSetLatency = asyncReaderStatsLogger.getOpStatsLogger("future_set");
-        scheduleLatency = asyncReaderStatsLogger.getOpStatsLogger("schedule");
-        backgroundReaderRunTime = asyncReaderStatsLogger.getOpStatsLogger("background_read");
-        readNextExecTime = asyncReaderStatsLogger.getOpStatsLogger("read_next_exec");
-        timeBetweenReadNexts = asyncReaderStatsLogger.getOpStatsLogger("time_between_read_next");
-        delayUntilPromiseSatisfied = asyncReaderStatsLogger.getOpStatsLogger("delay_until_promise_satisfied");
-        idleReaderError = asyncReaderStatsLogger.getCounter("idle_reader_error");
-        idleReaderCheckCount = asyncReaderStatsLogger.getCounter("idle_reader_check_total");
-        idleReaderCheckIdleReadRequestCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_read_requests");
-        idleReaderCheckIdleReadAheadCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_readahead");
-
-        // Lock the stream if requested. The lock will be released when the reader is closed.
-        this.lockStream = false;
-        this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary();
-        this.lastProcessTime = Stopwatch.createStarted();
-    }
-
-    private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
-        if (idleErrorThresholdMillis < Integer.MAX_VALUE) {
-            // Dont run the task more than once every seconds (for sanity)
-            long period = Math.max(idleErrorThresholdMillis / 10, 1000);
-            // Except when idle reader threshold is less than a second (tests?)
-            period = Math.min(period, idleErrorThresholdMillis / 5);
-
-            return executorService.scheduleAtFixedRate(new Runnable() {
-                @Override
-                public void run() {
-                    PendingReadRequest nextRequest = pendingRequests.peek();
-
-                    idleReaderCheckCount.inc();
-                    if (null == nextRequest) {
-                        return;
-                    }
-
-                    idleReaderCheckIdleReadRequestCount.inc();
-                    if (nextRequest.elapsedSinceEnqueue(TimeUnit.MILLISECONDS) < idleErrorThresholdMillis) {
-                        return;
-                    }
-
-                    ReadAheadEntryReader readAheadReader = getReadAheadReader();
-
-                    // read request has been idle
-                    //   - cache has records but read request are idle,
-                    //     that means notification was missed between readahead and reader.
-                    //   - cache is empty and readahead is idle (no records added for a long time)
-                    idleReaderCheckIdleReadAheadCount.inc();
-                    try {
-                        if (null == readAheadReader || (!hasMoreRecords() &&
-                                readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS))) {
-                            markReaderAsIdle();
-                            return;
-                        } else if (lastProcessTime.elapsed(TimeUnit.MILLISECONDS) > idleErrorThresholdMillis) {
-                            markReaderAsIdle();;
-                        }
-                    } catch (IOException e) {
-                        setLastException(e);
-                        return;
-                    }
-                }
-            }, period, period, TimeUnit.MILLISECONDS);
-        }
-        return null;
-    }
-
-    synchronized ReadAheadEntryReader getReadAheadReader() {
-        return readAheadReader;
-    }
-
-    void cancelIdleReaderTask() {
-        // Do this after we have checked that the reader was not previously closed
-        try {
-            if (null != idleReaderTimeoutTask) {
-                idleReaderTimeoutTask.cancel(true);
-            }
-        } catch (Exception exc) {
-            LOG.info("{}: Failed to cancel the background idle reader timeout task", readHandler.getFullyQualifiedName());
-        }
-    }
-
-    private void markReaderAsIdle() {
-        idleReaderError.inc();
-        IdleReaderException ire = new IdleReaderException("Reader on stream "
-                + readHandler.getFullyQualifiedName()
-                + " is idle for " + idleErrorThresholdMillis +" ms");
-        setLastException(ire);
-        // cancel all pending reads directly rather than notifying on error
-        // because idle reader could happen on idle read requests that usually means something wrong
-        // in scheduling reads
-        cancelAllPendingReads(ire);
-    }
-
-    protected synchronized void setStartDLSN(DLSN fromDLSN) throws UnexpectedException {
-        if (null != readAheadReader) {
-            throw new UnexpectedException("Could't reset from dlsn after reader already starts reading.");
-        }
-        startDLSN = fromDLSN;
-    }
-
-    @VisibleForTesting
-    public synchronized DLSN getStartDLSN() {
-        return startDLSN;
-    }
-
-    public Future<Void> lockStream() {
-        this.lockStream = true;
-        return readHandler.lockStream();
-    }
-
-    private boolean checkClosedOrInError(String operation) {
-        if (null == lastException.get()) {
-            try {
-                if (null != readHandler && null != getReadAheadReader()) {
-                    getReadAheadReader().checkLastException();
-                }
-
-                bkDistributedLogManager.checkClosedOrInError(operation);
-            } catch (IOException exc) {
-                setLastException(exc);
-            }
-        }
-
-        if (lockStream) {
-            try {
-                readHandler.checkReadLock();
-            } catch (IOException ex) {
-                setLastException(ex);
-            }
-        }
-
-        if (null != lastException.get()) {
-            LOG.trace("Cancelling pending reads");
-            cancelAllPendingReads(lastException.get());
-            return true;
-        }
-
-        return false;
-    }
-
-    private void setLastException(IOException exc) {
-        lastException.compareAndSet(null, exc);
-    }
-
-    @Override
-    public String getStreamName() {
-        return bkDistributedLogManager.getStreamName();
-    }
-
-    /**
-     * @return A promise that when satisfied will contain the Log Record with its DLSN.
-     */
-    @Override
-    public synchronized Future<LogRecordWithDLSN> readNext() {
-        return readInternal(1, 0, TimeUnit.MILLISECONDS).map(READ_NEXT_MAP_FUNCTION);
-    }
-
-    public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries) {
-        return readInternal(numEntries, 0, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries,
-                                                                 long waitTime,
-                                                                 TimeUnit timeUnit) {
-        return readInternal(numEntries, waitTime, timeUnit);
-    }
-
-    /**
-     * Read up to <i>numEntries</i> entries. The future will be satisfied when any number of entries are
-     * ready (1 to <i>numEntries</i>).
-     *
-     * @param numEntries
-     *          num entries to read
-     * @return A promise that satisfied with a non-empty list of log records with their DLSN.
-     */
-    private synchronized Future<List<LogRecordWithDLSN>> readInternal(int numEntries,
-                                                                      long deadlineTime,
-                                                                      TimeUnit deadlineTimeUnit) {
-        timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
-        readNextDelayStopwatch.reset().start();
-        final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit);
-
-        if (null == readAheadReader) {
-            try {
-                final ReadAheadEntryReader readAheadEntryReader = this.readAheadReader = new ReadAheadEntryReader(
-                        getStreamName(),
-                        getStartDLSN(),
-                        bkDistributedLogManager.getConf(),
-                        readHandler,
-                        bkDistributedLogManager.getReaderEntryStore(),
-                        bkDistributedLogManager.getScheduler(),
-                        Ticker.systemTicker(),
-                        bkDistributedLogManager.alertStatsLogger);
-                readHandler.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() {
-                    @Override
-                    public void onSuccess(Void value) {
-                        try {
-                            readHandler.registerListener(readAheadEntryReader);
-                            readHandler.asyncStartFetchLogSegments()
-                                    .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
-                                        @Override
-                                        public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
-                                            readAheadEntryReader.addStateChangeNotification(BKAsyncLogReaderDLSN.this);
-                                            readAheadEntryReader.start(logSegments.getValue());
-                                            return BoxedUnit.UNIT;
-                                        }
-                                    });
-                        } catch (Exception exc) {
-                            notifyOnError(exc);
-                        }
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        notifyOnError(cause);
-                    }
-                });
-            } catch (IOException ioe) {
-                notifyOnError(ioe);
-            }
-        }
-
-        if (checkClosedOrInError("readNext")) {
-            readRequest.setException(lastException.get());
-        } else {
-            boolean queueEmpty = pendingRequests.isEmpty();
-            pendingRequests.add(readRequest);
-
-            if (queueEmpty) {
-                scheduleBackgroundRead();
-            }
-        }
-
-        readNextExecTime.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
-        readNextDelayStopwatch.reset().start();
-
-        return readRequest.getPromise();
-    }
-
-    public synchronized void scheduleBackgroundRead() {
-        // if the reader is already closed, we don't need to schedule background read again.
-        if (null != closeFuture) {
-            return;
-        }
-
-        long prevCount = scheduleCount.getAndIncrement();
-        if (0 == prevCount) {
-            scheduleDelayStopwatch.reset().start();
-            executorService.submit(this);
-        }
-    }
-
-    @Override
-    public Future<Void> asyncClose() {
-        // Cancel the idle reader timeout task, interrupting if necessary
-        ReadCancelledException exception;
-        Promise<Void> closePromise;
-        synchronized (this) {
-            if (null != closeFuture) {
-                return closeFuture;
-            }
-            closePromise = closeFuture = new Promise<Void>();
-            exception = new ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was closed");
-            setLastException(exception);
-        }
-
-        // Do this after we have checked that the reader was not previously closed
-        cancelIdleReaderTask();
-
-        synchronized (scheduleLock) {
-            if (null != backgroundScheduleTask) {
-                backgroundScheduleTask.cancel(true);
-            }
-        }
-
-        cancelAllPendingReads(exception);
-
-        ReadAheadEntryReader readAheadReader = getReadAheadReader();
-        if (null != readAheadReader) {
-            readHandler.unregisterListener(readAheadReader);
-            readAheadReader.removeStateChangeNotification(this);
-        }
-        Utils.closeSequence(bkDistributedLogManager.getScheduler(), true,
-                readAheadReader,
-                readHandler
-        ).proxyTo(closePromise);
-        return closePromise;
-    }
-
-    private void cancelAllPendingReads(Throwable throwExc) {
-        for (PendingReadRequest promise : pendingRequests) {
-            promise.setException(throwExc);
-        }
-        pendingRequests.clear();
-    }
-
-    synchronized boolean hasMoreRecords() throws IOException {
-        if (null == readAheadReader) {
-            return false;
-        }
-        if (readAheadReader.getNumCachedEntries() > 0 || null != nextRecord) {
-            return true;
-        } else if (null != currentEntry) {
-            nextRecord = currentEntry.nextRecord();
-            return null != nextRecord;
-        }
-        return false;
-    }
-
-    private synchronized LogRecordWithDLSN readNextRecord() throws IOException {
-        if (null == readAheadReader) {
-            return null;
-        }
-        if (null == currentEntry) {
-            currentEntry = readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
-            // no entry after reading from read ahead then return null
-            if (null == currentEntry) {
-                return null;
-            }
-        }
-
-        LogRecordWithDLSN recordToReturn;
-        if (null == nextRecord) {
-            nextRecord = currentEntry.nextRecord();
-            // no more records in current entry
-            if (null == nextRecord) {
-                currentEntry = null;
-                return readNextRecord();
-            }
-        }
-
-        // found a record to return and prefetch the next one
-        recordToReturn = nextRecord;
-        nextRecord = currentEntry.nextRecord();
-        return recordToReturn;
-    }
-
-    @Override
-    public void run() {
-        synchronized(scheduleLock) {
-            if (scheduleDelayStopwatch.isRunning()) {
-                scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-            }
-
-            Stopwatch runTime = Stopwatch.createStarted();
-            int iterations = 0;
-            long scheduleCountLocal = scheduleCount.get();
-            LOG.debug("{}: Scheduled Background Reader", readHandler.getFullyQualifiedName());
-            while(true) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("{}: Executing Iteration: {}", readHandler.getFullyQualifiedName(), iterations++);
-                }
-
-                PendingReadRequest nextRequest = null;
-                synchronized(this) {
-                    nextRequest = pendingRequests.peek();
-
-                    // Queue is empty, nothing to read, return
-                    if (null == nextRequest) {
-                        LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName());
-                        scheduleCount.set(0);
-                        backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
-                        return;
-                    }
-
-                    if (disableProcessingReadRequests) {
-                        LOG.info("Reader of {} is forced to stop processing read requests", readHandler.getFullyQualifiedName());
-                        return;
-                    }
-                }
-                lastProcessTime.reset().start();
-
-                // If the oldest pending promise is interrupted then we must mark
-                // the reader in error and abort all pending reads since we dont
-                // know the last consumed read
-                if (null == lastException.get()) {
-                    if (nextRequest.getPromise().isInterrupted().isDefined()) {
-                        setLastException(new DLInterruptedException("Interrupted on reading " + readHandler.getFullyQualifiedName() + " : ",
-                                nextRequest.getPromise().isInterrupted().get()));
-                    }
-                }
-
-                if (checkClosedOrInError("readNext")) {
-                    if (!(lastException.get().getCause() instanceof LogNotFoundException)) {
-                        LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get());
-                    }
-                    backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
-                    return;
-                }
-
-                try {
-                    // Fail 10% of the requests when asked to simulate errors
-                    if (bkDistributedLogManager.getFailureInjector().shouldInjectErrors()) {
-                        throw new IOException("Reader Simulated Exception");
-                    }
-                    LogRecordWithDLSN record;
-                    while (!nextRequest.hasReadEnoughRecords()) {
-                        // read single record
-                        do {
-                            record = readNextRecord();
-                        } while (null != record && (record.isControl() || (record.getDlsn().compareTo(getStartDLSN()) < 0)));
-                        if (null == record) {
-                            break;
-                        } else {
-                            if (record.isEndOfStream() && !returnEndOfStreamRecord) {
-                                setLastException(new EndOfStreamException("End of Stream Reached for "
-                                        + readHandler.getFullyQualifiedName()));
-                                break;
-                            }
-
-                            // gap detection
-                            if (recordPositionsContainsGap(record, lastPosition)) {
-                                bkDistributedLogManager.raiseAlert("Gap detected between records at record = {}", record);
-                                if (positionGapDetectionEnabled) {
-                                    throw new DLIllegalStateException("Gap detected between records at record = " + record);
-                                }
-                            }
-                            lastPosition = record.getLastPositionWithinLogSegment();
-
-                            nextRequest.addRecord(record);
-                        }
-                    };
-                } catch (IOException exc) {
-                    setLastException(exc);
-                    if (!(exc instanceof LogNotFoundException)) {
-                        LOG.warn("{} : read with skip Exception", readHandler.getFullyQualifiedName(), lastException.get());
-                    }
-                    continue;
-                }
-
-                if (nextRequest.hasReadRecords()) {
-                    long remainingWaitTime = nextRequest.getRemainingWaitTime();
-                    if (remainingWaitTime > 0 && !nextRequest.hasReadEnoughRecords()) {
-                        backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
-                        scheduleDelayStopwatch.reset().start();
-                        scheduleCount.set(0);
-                        // the request could still wait for more records
-                        backgroundScheduleTask = executorService.schedule(BACKGROUND_READ_SCHEDULER, remainingWaitTime, nextRequest.deadlineTimeUnit);
-                        return;
-                    }
-
-                    PendingReadRequest request = pendingRequests.poll();
-                    if (null != request && nextRequest == request) {
-                        request.complete();
-                        if (null != backgroundScheduleTask) {
-                            backgroundScheduleTask.cancel(true);
-                            backgroundScheduleTask = null;
-                        }
-                    } else {
-                        DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at dlsn = "
-                                + nextRequest.records.get(0).getDlsn());
-                        nextRequest.setException(ise);
-                        if (null != request) {
-                            request.setException(ise);
-                        }
-                        // We should never get here as we should have exited the loop if
-                        // pendingRequests were empty
-                        bkDistributedLogManager.raiseAlert("Unexpected condition at dlsn = {}",
-                                nextRequest.records.get(0).getDlsn());
-                        setLastException(ise);
-                    }
-                } else {
-                    if (0 == scheduleCountLocal) {
-                        LOG.trace("Schedule count dropping to zero", lastException.get());
-                        backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
-                        return;
-                    }
-                    scheduleCountLocal = scheduleCount.decrementAndGet();
-                }
-            }
-        }
-    }
-
-    private boolean recordPositionsContainsGap(LogRecordWithDLSN record, long lastPosition) {
-        final boolean firstLogRecord = (1 == record.getPositionWithinLogSegment());
-        final boolean endOfStreamRecord = record.isEndOfStream();
-        final boolean emptyLogSegment = (0 == lastPosition);
-        final boolean positionIncreasedByOne = (record.getPositionWithinLogSegment() == (lastPosition + 1));
-
-        return !firstLogRecord && !endOfStreamRecord && !emptyLogSegment &&
-               !positionIncreasedByOne;
-    }
-
-    /**
-     * Triggered when the background activity encounters an exception
-     */
-    @Override
-    public void notifyOnError(Throwable cause) {
-        if (cause instanceof IOException) {
-            setLastException((IOException) cause);
-        } else {
-            setLastException(new IOException(cause));
-        }
-        scheduleBackgroundRead();
-    }
-
-    /**
-     * Triggered when the background activity completes an operation
-     */
-    @Override
-    public void notifyOnOperationComplete() {
-        scheduleBackgroundRead();
-    }
-
-    @VisibleForTesting
-    void simulateErrors() {
-        bkDistributedLogManager.getFailureInjector().injectErrors(true);
-    }
-
-    @VisibleForTesting
-    synchronized void disableReadAheadLogSegmentsNotification() {
-        readHandler.disableReadAheadLogSegmentsNotification();
-    }
-
-    @VisibleForTesting
-    synchronized void disableProcessingReadRequests() {
-        disableProcessingReadRequests = true;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index 4963787..219c0cf 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -98,7 +98,7 @@ import java.util.concurrent.TimeUnit;
  * <li> `log_writer/*`: all asynchronous writer related metrics are exposed under scope `log_writer`.
  * See {@link BKAsyncLogWriter} for detail stats.
  * <li> `async_reader/*`: all asyncrhonous reader related metrics are exposed under scope `async_reader`.
- * See {@link BKAsyncLogReaderDLSN} for detail stats.
+ * See {@link BKAsyncLogReader} for detail stats.
  * <li> `writer_future_pool/*`: metrics about the future pools that used by writers are exposed under
  * scope `writer_future_pool`. See {@link MonitoredFuturePool} for detail stats.
  * <li> `reader_future_pool/*`: metrics about the future pools that used by readers are exposed under
@@ -955,7 +955,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
     @Override
     public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) {
         Optional<String> subscriberId = Optional.absent();
-        AsyncLogReader reader = new BKAsyncLogReaderDLSN(
+        AsyncLogReader reader = new BKAsyncLogReader(
                 this,
                 scheduler,
                 fromDLSN,
@@ -993,7 +993,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
         if (!fromDLSN.isPresent() && !subscriberId.isPresent()) {
             return Future.exception(new UnexpectedException("Neither from dlsn nor subscriber id is provided."));
         }
-        final BKAsyncLogReaderDLSN reader = new BKAsyncLogReaderDLSN(
+        final BKAsyncLogReader reader = new BKAsyncLogReader(
                 BKDistributedLogManager.this,
                 scheduler,
                 fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN,
@@ -1077,7 +1077,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
             throws IOException {
         LOG.info("Create sync reader starting from {}", fromDLSN);
         checkClosedOrInError("getInputStream");
-        return new BKSyncLogReaderDLSN(
+        return new BKSyncLogReader(
                 conf,
                 this,
                 fromDLSN,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java
new file mode 100644
index 0000000..308f42a
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Ticker;
+import com.twitter.distributedlog.exceptions.EndOfStreamException;
+import com.twitter.distributedlog.exceptions.IdleReaderException;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.Utils;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Synchronous Log Reader based on {@link AsyncLogReader}
+ */
+class BKSyncLogReader implements LogReader, AsyncNotification {
+
+    private final BKDistributedLogManager bkdlm;
+    private final BKLogReadHandler readHandler;
+    private final AtomicReference<IOException> readerException =
+            new AtomicReference<IOException>(null);
+    private final int maxReadAheadWaitTime;
+    private Promise<Void> closeFuture;
+    private final Optional<Long> startTransactionId;
+    private boolean positioned = false;
+    private Entry.Reader currentEntry = null;
+
+    // readahead reader
+    ReadAheadEntryReader readAheadReader = null;
+
+    // idle reader settings
+    private final boolean shouldCheckIdleReader;
+    private final int idleErrorThresholdMillis;
+
+    // Stats
+    private final Counter idleReaderError;
+
+    BKSyncLogReader(DistributedLogConfiguration conf,
+                    BKDistributedLogManager bkdlm,
+                    DLSN startDLSN,
+                    Optional<Long> startTransactionId,
+                    StatsLogger statsLogger) throws IOException {
+        this.bkdlm = bkdlm;
+        this.readHandler = bkdlm.createReadHandler(
+                Optional.<String>absent(),
+                this,
+                true);
+        this.maxReadAheadWaitTime = conf.getReadAheadWaitTime();
+        this.idleErrorThresholdMillis = conf.getReaderIdleErrorThresholdMillis();
+        this.shouldCheckIdleReader = idleErrorThresholdMillis > 0 && idleErrorThresholdMillis < Integer.MAX_VALUE;
+        this.startTransactionId = startTransactionId;
+
+        // start readahead
+        startReadAhead(startDLSN);
+        if (!startTransactionId.isPresent()) {
+            positioned = true;
+        }
+
+        // Stats
+        StatsLogger syncReaderStatsLogger = statsLogger.scope("sync_reader");
+        idleReaderError = syncReaderStatsLogger.getCounter("idle_reader_error");
+    }
+
+    private void startReadAhead(DLSN startDLSN) throws IOException {
+        readAheadReader = new ReadAheadEntryReader(
+                    bkdlm.getStreamName(),
+                    startDLSN,
+                    bkdlm.getConf(),
+                    readHandler,
+                    bkdlm.getReaderEntryStore(),
+                    bkdlm.getScheduler(),
+                    Ticker.systemTicker(),
+                    bkdlm.alertStatsLogger);
+        readHandler.registerListener(readAheadReader);
+        readHandler.asyncStartFetchLogSegments()
+                .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
+                    @Override
+                    public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
+                        readAheadReader.addStateChangeNotification(BKSyncLogReader.this);
+                        readAheadReader.start(logSegments.getValue());
+                        return BoxedUnit.UNIT;
+                    }
+                });
+    }
+
+    @VisibleForTesting
+    ReadAheadEntryReader getReadAheadReader() {
+        return readAheadReader;
+    }
+
+    @VisibleForTesting
+    BKLogReadHandler getReadHandler() {
+        return readHandler;
+    }
+
+    private Entry.Reader readNextEntry(boolean nonBlocking) throws IOException {
+        Entry.Reader entry = null;
+        if (nonBlocking) {
+            return readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
+        } else {
+            while (!readAheadReader.isReadAheadCaughtUp()
+                    && null == readerException.get()
+                    && null == entry) {
+                entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
+            }
+            if (null != entry) {
+                return entry;
+            }
+            // reader is caught up
+            if (readAheadReader.isReadAheadCaughtUp()
+                    && null == readerException.get()) {
+                entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
+            }
+            return entry;
+        }
+    }
+
+    private void markReaderAsIdle() throws IdleReaderException {
+        idleReaderError.inc();
+        IdleReaderException ire = new IdleReaderException("Sync reader on stream "
+                + readHandler.getFullyQualifiedName()
+                + " is idle for more than " + idleErrorThresholdMillis + " ms");
+        readerException.compareAndSet(null, ire);
+        throw ire;
+    }
+
+    @Override
+    public synchronized LogRecordWithDLSN readNext(boolean nonBlocking)
+            throws IOException {
+        if (null != readerException.get()) {
+            throw readerException.get();
+        }
+        LogRecordWithDLSN record = doReadNext(nonBlocking);
+        // no record is returned, check if the reader becomes idle
+        if (null == record && shouldCheckIdleReader) {
+            if (readAheadReader.getNumCachedEntries() <= 0 &&
+                    readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) {
+                markReaderAsIdle();
+            }
+        }
+        return record;
+    }
+
+    private LogRecordWithDLSN doReadNext(boolean nonBlocking) throws IOException {
+        LogRecordWithDLSN record = null;
+
+        do {
+            // fetch one record until we don't find any entry available in the readahead cache
+            while (null == record) {
+                if (null == currentEntry) {
+                    currentEntry = readNextEntry(nonBlocking);
+                    if (null == currentEntry) {
+                        return null;
+                    }
+                }
+                record = currentEntry.nextRecord();
+                if (null == record) {
+                    currentEntry = null;
+                }
+            }
+
+            // check if we reached the end of stream
+            if (record.isEndOfStream()) {
+                EndOfStreamException eos = new EndOfStreamException("End of Stream Reached for "
+                        + readHandler.getFullyQualifiedName());
+                readerException.compareAndSet(null, eos);
+                throw eos;
+            }
+            // skip control records
+            if (record.isControl()) {
+                record = null;
+                continue;
+            }
+            if (!positioned) {
+                if (record.getTransactionId() < startTransactionId.get()) {
+                    record = null;
+                    continue;
+                } else {
+                    positioned = true;
+                    break;
+                }
+            } else {
+                break;
+            }
+        } while (true);
+        return record;
+    }
+
+    @Override
+    public synchronized List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords)
+            throws IOException {
+        LinkedList<LogRecordWithDLSN> retList =
+                new LinkedList<LogRecordWithDLSN>();
+
+        int numRead = 0;
+        LogRecordWithDLSN record = readNext(nonBlocking);
+        while ((null != record)) {
+            retList.add(record);
+            numRead++;
+            if (numRead >= numLogRecords) {
+                break;
+            }
+            record = readNext(nonBlocking);
+        }
+        return retList;
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        Promise<Void> closePromise;
+        synchronized (this) {
+            if (null != closeFuture) {
+                return closeFuture;
+            }
+            closeFuture = closePromise = new Promise<Void>();
+        }
+        readHandler.unregisterListener(readAheadReader);
+        readAheadReader.removeStateChangeNotification(this);
+        Utils.closeSequence(bkdlm.getScheduler(), true,
+                readAheadReader,
+                readHandler
+        ).proxyTo(closePromise);
+        return closePromise;
+    }
+
+    @Override
+    public void close() throws IOException {
+        FutureUtils.result(asyncClose());
+    }
+
+    //
+    // Notification From ReadHandler
+    //
+
+    @Override
+    public void notifyOnError(Throwable cause) {
+        if (cause instanceof IOException) {
+            readerException.compareAndSet(null, (IOException) cause);
+        } else {
+            readerException.compareAndSet(null, new IOException(cause));
+        }
+    }
+
+    @Override
+    public void notifyOnOperationComplete() {
+        // no-op
+    }
+}