You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/28 01:05:24 UTC

[16/20] incubator-distributedlog git commit: DL-112: Improve SyncReader to access readahead cache directly

DL-112: Improve SyncReader to access readahead cache directly

This changes the implementation of SyncReader to let it access the read ahead cache directly, rather
than calling AsyncReader to fetch the entries. It would reduce the function calls that SyncReader spends
on reading individual records.


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/e690a8cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/e690a8cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/e690a8cd

Branch: refs/heads/master
Commit: e690a8cd496a46f2085184668f98475d239510b4
Parents: b5d44cc
Author: Sijie Guo <si...@twitter.com>
Authored: Mon Nov 21 17:35:40 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Tue Dec 27 16:49:29 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKAsyncLogReaderDLSN.java    |  21 +-
 .../distributedlog/BKDistributedLogManager.java |  22 +-
 .../distributedlog/BKLogReadHandler.java        |   4 +
 .../distributedlog/BKSyncLogReaderDLSN.java     | 252 ++++++++++---------
 .../twitter/distributedlog/ReadAheadCache.java  |  12 +-
 .../NonBlockingReadsTestUtil.java               |   2 +-
 .../distributedlog/TestBKSyncLogReader.java     |   4 +-
 .../TestNonBlockingReadsMultiReader.java        |   2 +-
 8 files changed, 169 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/e690a8cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
index e11d7a3..cf792e3 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
@@ -97,7 +97,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
     private int lastPosition = 0;
     private final boolean positionGapDetectionEnabled;
     private final int idleErrorThresholdMillis;
-    private final ScheduledFuture<?> idleReaderTimeoutTask;
+    final ScheduledFuture<?> idleReaderTimeoutTask;
     private ScheduledFuture<?> backgroundScheduleTask = null;
 
     protected Promise<Void> closeFuture = null;
@@ -312,6 +312,17 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
         return null;
     }
 
+    void cancelIdleReaderTask() {
+        // Do this after we have checked that the reader was not previously closed
+        try {
+            if (null != idleReaderTimeoutTask) {
+                idleReaderTimeoutTask.cancel(true);
+            }
+        } catch (Exception exc) {
+            LOG.info("{}: Failed to cancel the background idle reader timeout task", bkLedgerManager.getFullyQualifiedName());
+        }
+    }
+
     protected synchronized void setStartDLSN(DLSN fromDLSN) throws UnexpectedException {
         if (readAheadStarted) {
             throw new UnexpectedException("Could't reset from dlsn after reader already starts reading.");
@@ -471,13 +482,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
         }
 
         // Do this after we have checked that the reader was not previously closed
-        try {
-            if (null != idleReaderTimeoutTask) {
-                idleReaderTimeoutTask.cancel(true);
-            }
-        } catch (Exception exc) {
-            LOG.info("{}: Failed to cancel the background idle reader timeout task", bkLedgerManager.getFullyQualifiedName());
-        }
+        cancelIdleReaderTask();
 
         synchronized (scheduleLock) {
             if (null != backgroundScheduleTask) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/e690a8cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index ae8ae12..ac37f3a 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -75,7 +75,6 @@ import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZKUtil;
@@ -408,7 +407,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
         this.readAheadExceptionsLogger = readAheadExceptionsLogger;
     }
 
-    private synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
+    synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
         if (createIfNull && null == lockStateExecutor && ownExecutor) {
             lockStateExecutor = OrderedScheduler.newBuilder()
                     .corePoolSize(1).name("BKDL-LockState").build();
@@ -1106,18 +1105,13 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
             throws IOException {
         LOG.info("Create async reader starting from {}", fromDLSN);
         checkClosedOrInError("getInputStream");
-        Optional<String> subscriberId = Optional.absent();
-        BKAsyncLogReaderDLSN asyncReader = new BKAsyncLogReaderDLSN(
+        LogReader reader = new BKSyncLogReaderDLSN(
+                conf,
                 this,
-                scheduler,
-                getLockStateExecutor(true),
                 fromDLSN,
-                subscriberId,
-                true,
-                dynConf.getDeserializeRecordSetOnReads(),
+                fromTxnId,
                 statsLogger);
-        pendingReaders.add(asyncReader);
-        return new BKSyncLogReaderDLSN(conf, asyncReader, scheduler, fromTxnId);
+        return reader;
     }
 
     /**
@@ -1325,17 +1319,17 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
     static class PendingReaders implements AsyncCloseable {
 
         final ExecutorService executorService;
-        final Set<AsyncLogReader> readers = new HashSet<AsyncLogReader>();
+        final Set<AsyncCloseable> readers = new HashSet<AsyncCloseable>();
 
         PendingReaders(ExecutorService executorService) {
             this.executorService = executorService;
         }
 
-        public synchronized void remove(AsyncLogReader reader) {
+        public synchronized void remove(AsyncCloseable reader) {
             readers.remove(reader);
         }
 
-        public synchronized void add(AsyncLogReader reader) {
+        public synchronized void add(AsyncCloseable reader) {
             readers.add(reader);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/e690a8cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
index faa47fc..30a96ff 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
@@ -482,6 +482,10 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
         return readAheadCache.getNextReadAheadEntry();
     }
 
+    public Entry.Reader getNextReadAheadEntry(long waitTime, TimeUnit waitTimeUnit) throws IOException {
+        return readAheadCache.getNextReadAheadEntry(waitTime, waitTimeUnit);
+    }
+
     public ReadAheadCache getReadAheadCache() {
         return readAheadCache;
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/e690a8cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
index bce12b6..ded318c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
@@ -19,171 +19,175 @@ package com.twitter.distributedlog;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.twitter.distributedlog.callback.ReadAheadCallback;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.EndOfStreamException;
+import com.twitter.distributedlog.exceptions.IdleReaderException;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
 
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Synchronous Log Reader based on {@link AsyncLogReader}
  */
-class BKSyncLogReaderDLSN implements LogReader, Runnable, FutureEventListener<LogRecordWithDLSN>, ReadAheadCallback {
+class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
 
-    private final BKAsyncLogReaderDLSN reader;
-    private final ScheduledExecutorService executorService;
-    private final LinkedBlockingQueue<LogRecordWithDLSN> readAheadRecords;
+    private final BKLogReadHandler readHandler;
     private final AtomicReference<IOException> readerException =
             new AtomicReference<IOException>(null);
-    private final int maxNumCachedRecords;
     private final int maxReadAheadWaitTime;
-    private ReadAheadCallback readAheadCallback = null;
     private Promise<Void> closeFuture;
     private final Optional<Long> startTransactionId;
-    private final DLSN startDLSN;
-    // lock on variables that would be accessed by both background threads and foreground threads
-    private final Object sharedLock = new Object();
+    private boolean positioned = false;
+    private Entry.Reader currentEntry = null;
+
+    // idle reader settings
+    private final boolean shouldCheckIdleReader;
+    private final int idleErrorThresholdMillis;
+
+    // Stats
+    private final Counter idleReaderError;
 
     BKSyncLogReaderDLSN(DistributedLogConfiguration conf,
-                        BKAsyncLogReaderDLSN reader,
-                        ScheduledExecutorService executorService,
-                        Optional<Long> startTransactionId) {
-        this.maxNumCachedRecords = conf.getReadAheadMaxRecords();
+                        BKDistributedLogManager bkdlm,
+                        DLSN startDLSN,
+                        Optional<Long> startTransactionId,
+                        StatsLogger statsLogger) {
+        this.readHandler = bkdlm.createReadHandler(
+                Optional.<String>absent(),
+                bkdlm.getLockStateExecutor(true),
+                this,
+                conf.getDeserializeRecordSetOnReads(),
+                true);
         this.maxReadAheadWaitTime = conf.getReadAheadWaitTime();
-        this.reader = reader;
-        this.executorService = executorService;
-        this.readAheadRecords = new LinkedBlockingQueue<LogRecordWithDLSN>();
+        this.idleErrorThresholdMillis = conf.getReaderIdleErrorThresholdMillis();
+        this.shouldCheckIdleReader = idleErrorThresholdMillis > 0 && idleErrorThresholdMillis < Integer.MAX_VALUE;
         this.startTransactionId = startTransactionId;
-        this.startDLSN = reader.getStartDLSN();
-        scheduleReadNext();
+        readHandler.startReadAhead(
+                new LedgerReadPosition(startDLSN),
+                AsyncFailureInjector.NULL);
+        if (!startTransactionId.isPresent()) {
+            positioned = true;
+        }
+
+        // Stats
+        StatsLogger syncReaderStatsLogger = statsLogger.scope("sync_reader");
+        idleReaderError = syncReaderStatsLogger.getCounter("idle_reader_error");
     }
 
     @VisibleForTesting
-    BKAsyncLogReaderDLSN getAsyncReader() {
-        return reader;
+    BKLogReadHandler getReadHandler() {
+        return readHandler;
     }
 
-    private void scheduleReadNext() {
-        synchronized (sharedLock) {
-            if (null != closeFuture) {
-                return;
-            }
-        }
-        this.executorService.submit(this);
-    }
+    // reader is still catching up, waiting for next record
 
-    private void invokeReadAheadCallback() {
-        synchronized (sharedLock) {
-            if (null != readAheadCallback) {
-                readAheadCallback.resumeReadAhead();
-                readAheadCallback = null;
+    private Entry.Reader readNextEntry(boolean nonBlocking) throws IOException {
+        Entry.Reader entry = null;
+        if (nonBlocking) {
+            return readHandler.getNextReadAheadEntry();
+        } else {
+            while (!readHandler.isReadAheadCaughtUp()
+                    && null == readerException.get()
+                    && null == entry) {
+                entry = readHandler.getNextReadAheadEntry(maxReadAheadWaitTime,
+                        TimeUnit.MILLISECONDS);
             }
-        }
-    }
-
-    private void setReadAheadCallback(ReadAheadCallback callback) {
-        synchronized (sharedLock) {
-            this.readAheadCallback = callback;
-            if (readAheadRecords.size() < maxNumCachedRecords) {
-                invokeReadAheadCallback();
+            if (null != entry) {
+                return entry;
+            }
+            // reader is caught up
+            if (readHandler.isReadAheadCaughtUp()
+                    && null == entry
+                    && null == readerException.get()) {
+                entry = readHandler.getNextReadAheadEntry(maxReadAheadWaitTime,
+                        TimeUnit.MILLISECONDS);
             }
+            return entry;
         }
     }
 
-    // Background Read Future Listener
-
-    @Override
-    public void resumeReadAhead() {
-        scheduleReadNext();
+    private void markReaderAsIdle() throws IdleReaderException {
+        idleReaderError.inc();
+        IdleReaderException ire = new IdleReaderException("Sync reader on stream "
+                + readHandler.getFullyQualifiedName()
+                + " is idle for more than " + idleErrorThresholdMillis + " ms");
+        readerException.compareAndSet(null, ire);
+        throw ire;
     }
 
-    @Override
-    public void onSuccess(LogRecordWithDLSN record) {
-        if (!startTransactionId.isPresent() || record.getTransactionId() >= startTransactionId.get()) {
-            readAheadRecords.add(record);
-        }
-        if (readAheadRecords.size() >= maxNumCachedRecords) {
-            setReadAheadCallback(this);
-        } else {
-            scheduleReadNext();
-        }
-    }
 
     @Override
-    public void onFailure(Throwable cause) {
-        if (cause instanceof IOException) {
-            readerException.compareAndSet(null, (IOException) cause);
-        } else {
-            readerException.compareAndSet(null, new IOException("Encountered exception on reading "
-                    + reader.getStreamName() + " : ", cause));
+    public synchronized LogRecordWithDLSN readNext(boolean nonBlocking)
+            throws IOException {
+        if (null != readerException.get()) {
+            throw readerException.get();
         }
-    }
 
-    // Background Read
+        LogRecordWithDLSN record = doReadNext(nonBlocking);
 
-    @Override
-    public void run() {
-        this.reader.readNext().addEventListener(this);
+        // no record is returned, check if the reader becomes idle
+        if (null == record && shouldCheckIdleReader) {
+            ReadAheadCache cache = readHandler.getReadAheadCache();
+            if (cache.getNumCachedEntries() <= 0 &&
+                    cache.isReadAheadIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) {
+                markReaderAsIdle();
+            }
+        }
+
+        return record;
     }
 
-    @Override
-    public synchronized LogRecordWithDLSN readNext(boolean nonBlocking)
+    private synchronized LogRecordWithDLSN doReadNext(boolean nonBlocking)
             throws IOException {
-        if (null != readerException.get()) {
-            throw readerException.get();
-        }
         LogRecordWithDLSN record = null;
-        if (nonBlocking) {
-            record = readAheadRecords.poll();
-        } else {
-            try {
-                // reader is still catching up, waiting for next record
-                while (!reader.bkLedgerManager.isReadAheadCaughtUp()
-                        && null == readerException.get()
-                        && null == record) {
-                    record = readAheadRecords.poll(maxReadAheadWaitTime,
-                            TimeUnit.MILLISECONDS);
-                }
-                // reader caught up
-                boolean shallWait = true;
-                while (shallWait
-                        && reader.bkLedgerManager.isReadAheadCaughtUp()
-                        && null == record
-                        && null == readerException.get()) {
-                    record = readAheadRecords.poll(maxReadAheadWaitTime,
-                            TimeUnit.MILLISECONDS);
-                    if (null != record) {
-                        break;
+
+        do {
+            // fetch one record until we don't find any entry available in the readahead cache
+            while (null == record) {
+                if (null == currentEntry) {
+                    currentEntry = readNextEntry(nonBlocking);
+                    if (null == currentEntry) {
+                        return null;
                     }
-                    shallWait = reader.hasMoreRecords();
                 }
-            } catch (InterruptedException e) {
-                throw new DLInterruptedException("Interrupted on waiting next available log record for stream "
-                        + reader.getStreamName(), e);
+                record = currentEntry.nextRecord();
+                if (null == record) {
+                    currentEntry = null;
+                }
             }
-        }
-        if (null != readerException.get()) {
-            throw readerException.get();
-        }
-        if (null != record) {
+
+            // check if we reached the end of stream
             if (record.isEndOfStream()) {
                 EndOfStreamException eos = new EndOfStreamException("End of Stream Reached for "
-                                        + reader.bkLedgerManager.getFullyQualifiedName());
+                        + readHandler.getFullyQualifiedName());
                 readerException.compareAndSet(null, eos);
                 throw eos;
             }
-            invokeReadAheadCallback();
-        }
+            // skip control records
+            if (record.isControl()) {
+                record = null;
+                continue;
+            }
+            if (!positioned) {
+                if (record.getTransactionId() < startTransactionId.get()) {
+                    record = null;
+                    continue;
+                } else {
+                    positioned = true;
+                    break;
+                }
+            } else {
+                break;
+            }
+        } while (true);
         return record;
     }
 
@@ -209,13 +213,13 @@ class BKSyncLogReaderDLSN implements LogReader, Runnable, FutureEventListener<Lo
     @Override
     public Future<Void> asyncClose() {
         Promise<Void> closePromise;
-        synchronized (sharedLock) {
+        synchronized (this) {
             if (null != closeFuture) {
                 return closeFuture;
             }
             closeFuture = closePromise = new Promise<Void>();
         }
-        reader.asyncClose().proxyTo(closePromise);
+        readHandler.asyncClose().proxyTo(closePromise);
         return closePromise;
     }
 
@@ -225,18 +229,20 @@ class BKSyncLogReaderDLSN implements LogReader, Runnable, FutureEventListener<Lo
     }
 
     //
-    // Test Methods
+    // Notification From ReadHandler
     //
-    @VisibleForTesting
-    void disableReadAheadLogSegmentsNotification() {
-        reader.bkLedgerManager.disableReadAheadLogSegmentsNotification();
-    }
 
-    @VisibleForTesting
-    LedgerReadPosition getReadAheadPosition() {
-        if (null != reader.bkLedgerManager.readAheadWorker) {
-            return reader.bkLedgerManager.readAheadWorker.getNextReadAheadPosition();
+    @Override
+    public void notifyOnError(Throwable cause) {
+        if (cause instanceof IOException) {
+            readerException.compareAndSet(null, (IOException) cause);
+        } else {
+            readerException.compareAndSet(null, new IOException(cause));
         }
-        return null;
+    }
+
+    @Override
+    public void notifyOnOperationComplete() {
+        // no-op
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/e690a8cd/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java
index d6051f6..284b327 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Ticker;
 import com.twitter.distributedlog.callback.ReadAheadCallback;
+import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException;
 import com.twitter.distributedlog.exceptions.LogReadException;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -107,11 +108,20 @@ public class ReadAheadCache {
      * @throws IOException
      */
     public Entry.Reader getNextReadAheadEntry() throws IOException {
+        return getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
+    }
+
+    public Entry.Reader getNextReadAheadEntry(long waitTime, TimeUnit waitTimeUnit) throws IOException {
         if (null != lastException.get()) {
             throw lastException.get();
         }
 
-        Entry.Reader entry = readAheadEntries.poll();
+        Entry.Reader entry = null;
+        try {
+            entry = readAheadEntries.poll(waitTime, waitTimeUnit);
+        } catch (InterruptedException e) {
+            throw new DLInterruptedException("Interrupted on polling readahead entries : ", e);
+        }
 
         if (null != entry) {
             if (!isCacheFull()) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/e690a8cd/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
index fb69c8d..c34eb6e 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
@@ -65,7 +65,7 @@ class NonBlockingReadsTestUtil {
         try {
             LOG.info("Created reader reading from {}", dlm.getStreamName());
             if (forceStall) {
-                reader.disableReadAheadLogSegmentsNotification();
+                reader.getReadHandler().disableReadAheadLogSegmentsNotification();
             }
 
             long numTrans = 0;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/e690a8cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
index c452317..c029dca 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
@@ -162,7 +162,7 @@ public class TestBKSyncLogReader extends TestDistributedLogBase {
         BKSyncLogReaderDLSN reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1L);
 
         // wait until readahead caught up
-        while (!reader.getAsyncReader().bkLedgerManager.isReadAheadCaughtUp()) {
+        while (!reader.getReadHandler().isReadAheadCaughtUp()) {
             TimeUnit.MILLISECONDS.sleep(20);
         }
 
@@ -178,7 +178,7 @@ public class TestBKSyncLogReader extends TestDistributedLogBase {
         logger.info("Write another 10 records");
 
         // wait until readahead move on
-        while (reader.getAsyncReader().bkLedgerManager
+        while (reader.getReadHandler()
                 .readAheadWorker.getNextReadAheadPosition().getEntryId() < 21) {
             TimeUnit.MILLISECONDS.sleep(20);
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/e690a8cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
index f513a28..775c99d 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
@@ -150,7 +150,7 @@ public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase {
             while (writeCount.get() > (readerThreads[0].getReadCount())) {
                 LOG.info("Write Count = {}, Read Count = {}, ReadAhead = {}",
                         new Object[] { writeCount.get(), readerThreads[0].getReadCount(),
-                                        reader0.getReadAheadPosition() });
+                                        reader0.getReadHandler().getReadAheadCache() });
                 TimeUnit.MILLISECONDS.sleep(100);
             }
             assertEquals(writeCount.get(),