You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:34 UTC

[20/31] incubator-distributedlog git commit: DL-162: Use log segment entry store interface

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
deleted file mode 100644
index 8529281..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
+++ /dev/null
@@ -1,1503 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.readahead;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.exceptions.AlreadyTruncatedTransactionException;
-import com.twitter.distributedlog.AsyncNotification;
-import com.twitter.distributedlog.BKLogHandler;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.LedgerDescriptor;
-import com.twitter.distributedlog.LedgerHandleCache;
-import com.twitter.distributedlog.LedgerReadPosition;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.LogReadException;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.ReadAheadCache;
-import com.twitter.distributedlog.callback.LogSegmentListener;
-import com.twitter.distributedlog.callback.ReadAheadCallback;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.metadata.LogMetadataForReader;
-import com.twitter.distributedlog.injector.AsyncFailureInjector;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.distributedlog.logsegment.LogSegmentFilter;
-import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.AsyncCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
-import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Function1;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.util.Enumeration;
-import java.util.List;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * ReadAhead Worker process readahead in asynchronous way. The whole readahead process are chained into
- * different phases:
- *
- * <p>
- * ScheduleReadAheadPhase: Schedule the readahead request based on previous state (e.g. whether to backoff).
- * After the readahead request was scheduled, the worker enters ReadAhead phase.
- * </p>
- * <p>
- * ReadAhead Phase: This phase is divided into several sub-phases. All the sub-phases are chained into the
- * execution flow. If errors happened during execution, the worker enters Exceptions Handling Phase.
- * <br>
- * CheckInProgressChangedPhase: check whether there is in progress ledgers changed and updated the metadata.
- * <br>
- * OpenLedgerPhase: open ledgers if necessary for following read requests.
- * <br>
- * ReadEntriesPhase: read entries from bookkeeper and fill the readahead cache.
- * <br>
- * After that, the worker goes back to Schedule Phase to schedule next readahead request.
- * </p>
- * <p>
- * Exceptions Handling Phase: Handle all the exceptions and properly schedule next readahead request.
- * </p>
- */
-public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncCloseable, LogSegmentListener {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ReadAheadWorker.class);
-
-    private static final int BKC_ZK_EXCEPTION_THRESHOLD_IN_SECONDS = 30;
-    private static final int BKC_UNEXPECTED_EXCEPTION_THRESHOLD = 3;
-
-    // Stream information
-    private final String fullyQualifiedName;
-    private final DistributedLogConfiguration conf;
-    private final DynamicDistributedLogConfiguration dynConf;
-    private final LogMetadataForReader logMetadata;
-    private final BKLogHandler bkLedgerManager;
-    private final boolean isHandleForReading;
-    // Notification to notify readahead status
-    protected final AsyncNotification notification;
-
-    // resources
-    protected final OrderedScheduler scheduler;
-    private final LedgerHandleCache handleCache;
-    private final ReadAheadCache readAheadCache;
-
-    // ReadAhead Status
-    volatile boolean running = true;
-    Promise<Void> stopPromise = null;
-    private volatile boolean isCatchingUp = true;
-    private volatile boolean logDeleted = false;
-    private volatile boolean readAheadError = false;
-    private volatile boolean readAheadInterrupted = false;
-    private volatile boolean readingFromTruncated = false;
-
-    // Exceptions Handling
-    volatile boolean encounteredException = false;
-    private final AtomicInteger bkcZkExceptions = new AtomicInteger(0);
-    private final AtomicInteger bkcUnExpectedExceptions = new AtomicInteger(0);
-    private final int noLedgerExceptionOnReadLACThreshold;
-    private final AtomicInteger bkcNoLedgerExceptionsOnReadLAC = new AtomicInteger(0);
-
-    // Read Ahead Positions
-    private final LedgerReadPosition startReadPosition;
-    protected LedgerReadPosition nextReadAheadPosition;
-
-    //
-    // LogSegments & Metadata Notification
-    //
-
-    // variables related to zookeeper watcher notification to interrupt long poll waits
-    final Object notificationLock = new Object();
-    AsyncNotification metadataNotification = null;
-    volatile long metadataNotificationTimeMillis = -1L;
-
-    // variables related to log segments
-    private volatile boolean reInitializeMetadata = true;
-    private volatile boolean forceReadLogSegments = false;
-    volatile boolean inProgressChanged = false;
-    private LogSegmentMetadata currentMetadata = null;
-    private int currentMetadataIndex;
-    protected LedgerDescriptor currentLH;
-    private volatile List<LogSegmentMetadata> logSegmentListNotified;
-    private volatile List<LogSegmentMetadata> logSegmentList;
-
-    //
-    // ReadAhead Phases
-    //
-
-    final Phase schedulePhase = new ScheduleReadAheadPhase();
-    final Phase exceptionHandler = new ExceptionHandlePhase(schedulePhase);
-    final Phase readAheadPhase =
-            new StoppablePhase(
-                    new CheckInProgressChangedPhase(
-                            new OpenLedgerPhase(
-                                    new ReadEntriesPhase(schedulePhase))));
-
-    //
-    // Stats, Tracing and Failure Injection
-    //
-
-    // failure injector
-    private final AsyncFailureInjector failureInjector;
-    // trace
-    protected final long metadataLatencyWarnThresholdMillis;
-    final ReadAheadTracker tracker;
-    final Stopwatch resumeStopWatch;
-    final Stopwatch LACNotAdvancedStopWatch = Stopwatch.createUnstarted();
-    // Misc
-    private final boolean readAheadSkipBrokenEntries;
-    // Stats
-    private final AlertStatsLogger alertStatsLogger;
-    private final StatsLogger readAheadPerStreamStatsLogger;
-    private final Counter readAheadWorkerWaits;
-    private final Counter readAheadEntryPiggyBackHits;
-    private final Counter readAheadEntryPiggyBackMisses;
-    private final Counter readAheadReadLACAndEntryCounter;
-    private final Counter readAheadCacheFullCounter;
-    private final Counter readAheadSkippedBrokenEntries;
-    private final Counter idleReaderWarn;
-    private final OpStatsLogger readAheadReadEntriesStat;
-    private final OpStatsLogger readAheadCacheResumeStat;
-    private final OpStatsLogger readAheadLacLagStats;
-    private final OpStatsLogger longPollInterruptionStat;
-    private final OpStatsLogger metadataReinitializationStat;
-    private final OpStatsLogger notificationExecutionStat;
-    private final ReadAheadExceptionsLogger readAheadExceptionsLogger;
-
-    public ReadAheadWorker(DistributedLogConfiguration conf,
-                           DynamicDistributedLogConfiguration dynConf,
-                           LogMetadataForReader logMetadata,
-                           BKLogHandler ledgerManager,
-                           OrderedScheduler scheduler,
-                           LedgerHandleCache handleCache,
-                           LedgerReadPosition startPosition,
-                           ReadAheadCache readAheadCache,
-                           boolean isHandleForReading,
-                           ReadAheadExceptionsLogger readAheadExceptionsLogger,
-                           StatsLogger handlerStatsLogger,
-                           StatsLogger readAheadPerStreamStatsLogger,
-                           AlertStatsLogger alertStatsLogger,
-                           AsyncFailureInjector failureInjector,
-                           AsyncNotification notification) {
-        // Log information
-        this.fullyQualifiedName = logMetadata.getFullyQualifiedName();
-        this.conf = conf;
-        this.dynConf = dynConf;
-        this.logMetadata = logMetadata;
-        this.bkLedgerManager = ledgerManager;
-        this.isHandleForReading = isHandleForReading;
-        this.notification = notification;
-        // Resources
-        this.scheduler = scheduler;
-        this.handleCache = handleCache;
-        this.readAheadCache = readAheadCache;
-        // Readahead status
-        this.startReadPosition = new LedgerReadPosition(startPosition);
-        this.nextReadAheadPosition = new LedgerReadPosition(startPosition);
-        // LogSegments
-
-        // Failure Detection
-        this.failureInjector = failureInjector;
-        // Tracing
-        this.metadataLatencyWarnThresholdMillis = conf.getMetadataLatencyWarnThresholdMillis();
-        this.noLedgerExceptionOnReadLACThreshold =
-                conf.getReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis() / conf.getReadAheadWaitTime();
-        this.tracker = new ReadAheadTracker(logMetadata.getLogName(), readAheadCache,
-                ReadAheadPhase.SCHEDULE_READAHEAD, readAheadPerStreamStatsLogger);
-        this.resumeStopWatch = Stopwatch.createUnstarted();
-        // Misc
-        this.readAheadSkipBrokenEntries = conf.getReadAheadSkipBrokenEntries();
-        // Stats
-        this.alertStatsLogger = alertStatsLogger;
-        this.readAheadPerStreamStatsLogger = readAheadPerStreamStatsLogger;
-        StatsLogger readAheadStatsLogger = handlerStatsLogger.scope("readahead_worker");
-        readAheadWorkerWaits = readAheadStatsLogger.getCounter("wait");
-        readAheadEntryPiggyBackHits = readAheadStatsLogger.getCounter("entry_piggy_back_hits");
-        readAheadEntryPiggyBackMisses = readAheadStatsLogger.getCounter("entry_piggy_back_misses");
-        readAheadReadEntriesStat = readAheadStatsLogger.getOpStatsLogger("read_entries");
-        readAheadReadLACAndEntryCounter = readAheadStatsLogger.getCounter("read_lac_and_entry_counter");
-        readAheadCacheFullCounter = readAheadStatsLogger.getCounter("cache_full");
-        readAheadSkippedBrokenEntries = readAheadStatsLogger.getCounter("skipped_broken_entries");
-        readAheadCacheResumeStat = readAheadStatsLogger.getOpStatsLogger("resume");
-        readAheadLacLagStats = readAheadStatsLogger.getOpStatsLogger("read_lac_lag");
-        longPollInterruptionStat = readAheadStatsLogger.getOpStatsLogger("long_poll_interruption");
-        notificationExecutionStat = readAheadStatsLogger.getOpStatsLogger("notification_execution");
-        metadataReinitializationStat = readAheadStatsLogger.getOpStatsLogger("metadata_reinitialization");
-        idleReaderWarn = readAheadStatsLogger.getCounter("idle_reader_warn");
-        this.readAheadExceptionsLogger = readAheadExceptionsLogger;
-    }
-
-    @VisibleForTesting
-    public LedgerReadPosition getNextReadAheadPosition() {
-        return nextReadAheadPosition;
-    }
-
-    public LedgerDescriptor getCurrentLedgerDescriptor() {
-        return currentLH;
-    }
-
-    //
-    // ReadAhead Status
-    //
-
-    void setReadAheadError(ReadAheadTracker tracker, Throwable cause) {
-        LOG.error("Read Ahead for {} is set to error.", logMetadata.getFullyQualifiedName());
-        readAheadError = true;
-        tracker.enterPhase(ReadAheadPhase.ERROR);
-        if (null != notification) {
-            notification.notifyOnError(cause);
-        }
-        if (null != stopPromise) {
-            FutureUtils.setValue(stopPromise, null);
-        }
-    }
-
-    void setReadAheadInterrupted(ReadAheadTracker tracker) {
-        readAheadInterrupted = true;
-        tracker.enterPhase(ReadAheadPhase.INTERRUPTED);
-        if (null != notification) {
-            notification.notifyOnError(new DLInterruptedException("ReadAhead worker for "
-                    + bkLedgerManager.getFullyQualifiedName() + " is interrupted."));
-        }
-        if (null != stopPromise) {
-            FutureUtils.setValue(stopPromise, null);
-        }
-    }
-
-    void setReadingFromTruncated(ReadAheadTracker tracker) {
-        readingFromTruncated = true;
-        tracker.enterPhase(ReadAheadPhase.TRUNCATED);
-        if (null != notification) {
-            notification.notifyOnError(
-                    new AlreadyTruncatedTransactionException(logMetadata.getFullyQualifiedName()
-                            + ": Trying to position read ahead to a segment that is marked truncated"));
-        }
-        if (null != stopPromise) {
-            FutureUtils.setValue(stopPromise, null);
-        }
-    }
-
-    private void setReadAheadStopped() {
-        tracker.enterPhase(ReadAheadPhase.STOPPED);
-        if (null != stopPromise) {
-            FutureUtils.setValue(stopPromise, null);
-        }
-        LOG.info("Stopped ReadAheadWorker for {}", fullyQualifiedName);
-    }
-
-    public void checkClosedOrInError()
-            throws LogNotFoundException, LogReadException, DLInterruptedException,
-            AlreadyTruncatedTransactionException {
-        if (logDeleted) {
-            throw new LogNotFoundException(logMetadata.getFullyQualifiedName() + " is already deleted.");
-        } else if (readingFromTruncated) {
-            throw new AlreadyTruncatedTransactionException(
-                String.format("%s: Trying to position read ahead a segment that is marked truncated",
-                    logMetadata.getFullyQualifiedName()));
-        } else if (readAheadInterrupted) {
-            throw new DLInterruptedException(String.format("%s: ReadAhead Thread was interrupted",
-                logMetadata.getFullyQualifiedName()));
-        } else if (readAheadError) {
-            throw new LogReadException(String.format("%s: ReadAhead Thread encountered exceptions",
-                logMetadata.getFullyQualifiedName()));
-        }
-    }
-
-    public boolean isCaughtUp() {
-        return !isCatchingUp;
-    }
-
-    public void start(List<LogSegmentMetadata> segmentList) {
-        LOG.debug("Starting ReadAhead Worker for {} : segments = {}",
-                fullyQualifiedName, segmentList);
-        running = true;
-        logSegmentListNotified = segmentList;
-        schedulePhase.process(BKException.Code.OK);
-    }
-
-    @Override
-    public Future<Void> asyncClose() {
-        LOG.info("Stopping Readahead worker for {}", fullyQualifiedName);
-        running = false;
-        // Unregister associated gauages to prevent GC spiral
-        this.tracker.unregisterGauge();
-
-        // Aside from unfortunate naming of variables, this allows
-        // the currently active long poll to be interrupted and completed
-        AsyncNotification notification;
-        synchronized (notificationLock) {
-            notification = metadataNotification;
-            metadataNotification = null;
-        }
-        if (null != notification) {
-            notification.notifyOnOperationComplete();
-        }
-        if (null == stopPromise) {
-            return Future.Void();
-        }
-        return FutureUtils.ignore(FutureUtils.within(
-                stopPromise,
-                2 * conf.getReadAheadWaitTime(),
-                TimeUnit.MILLISECONDS,
-                new TimeoutException("Timeout on waiting for ReadAhead worker to stop " + fullyQualifiedName),
-                scheduler,
-                fullyQualifiedName));
-    }
-
-    @Override
-    public String toString() {
-        return "Running:" + running +
-            ", NextReadAheadPosition:" + nextReadAheadPosition +
-            ", BKZKExceptions:" + bkcZkExceptions.get() +
-            ", BKUnexpectedExceptions:" + bkcUnExpectedExceptions.get() +
-            ", EncounteredException:" + encounteredException +
-            ", readAheadError:" + readAheadError +
-            ", readAheadInterrupted" + readAheadInterrupted +
-            ", CurrentMetadata:" + ((null != currentMetadata) ? currentMetadata : "NONE") +
-            ", FailureInjector:" + failureInjector;
-    }
-
-    @Override
-    public void resumeReadAhead() {
-        try {
-            long cacheResumeLatency = resumeStopWatch.stop().elapsed(TimeUnit.MICROSECONDS);
-            readAheadCacheResumeStat.registerSuccessfulEvent(cacheResumeLatency);
-        } catch (IllegalStateException ise) {
-            LOG.error("Encountered illegal state when stopping resume stop watch for {} : ",
-                    logMetadata.getFullyQualifiedName(), ise);
-        }
-        submit(this);
-    }
-
-    Runnable addRTEHandler(final Runnable runnable) {
-        return new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    runnable.run();
-                } catch (RuntimeException rte) {
-                    LOG.error("ReadAhead on stream {} encountered runtime exception",
-                            logMetadata.getFullyQualifiedName(), rte);
-                    setReadAheadError(tracker, rte);
-                    throw rte;
-                }
-            }
-        };
-    }
-
-    <T> Function1<T, BoxedUnit> submit(final Function1<T, BoxedUnit> function) {
-        return new AbstractFunction1<T, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(final T input) {
-                submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        function.apply(input);
-                    }
-                });
-                return BoxedUnit.UNIT;
-            }
-        };
-    }
-
-    void submit(Runnable runnable) {
-        if (failureInjector.shouldInjectStops()) {
-            LOG.warn("Error injected: read ahead for stream {} is going to stall.",
-                    logMetadata.getFullyQualifiedName());
-            return;
-        }
-
-        if (failureInjector.shouldInjectDelays()) {
-            int delayMs = failureInjector.getInjectedDelayMs();
-            schedule(runnable, delayMs);
-            return;
-        }
-
-        try {
-            scheduler.submit(addRTEHandler(runnable));
-        } catch (RejectedExecutionException ree) {
-            setReadAheadError(tracker, ree);
-        }
-    }
-
-    private void schedule(Runnable runnable, long timeInMillis) {
-        try {
-            InterruptibleScheduledRunnable task = new InterruptibleScheduledRunnable(runnable);
-            boolean executeImmediately = setMetadataNotification(task);
-            if (executeImmediately) {
-                scheduler.submit(addRTEHandler(task));
-                return;
-            }
-            scheduler.schedule(addRTEHandler(task), timeInMillis, TimeUnit.MILLISECONDS);
-            readAheadWorkerWaits.inc();
-        } catch (RejectedExecutionException ree) {
-            setReadAheadError(tracker, ree);
-        }
-    }
-
-    private void handleException(ReadAheadPhase phase, int returnCode) {
-        readAheadExceptionsLogger.getBKExceptionStatsLogger(phase.name()).getExceptionCounter(returnCode).inc();
-        exceptionHandler.process(returnCode);
-    }
-
-    private boolean closeCurrentLedgerHandle() {
-        if (currentLH == null) {
-            return true;
-        }
-        boolean retVal = false;
-        LedgerDescriptor ld = currentLH;
-        try {
-            handleCache.closeLedger(ld);
-            currentLH = null;
-            retVal = true;
-        } catch (BKException bke) {
-            LOG.debug("BK Exception during closing {} : ", ld, bke);
-            handleException(ReadAheadPhase.CLOSE_LEDGER, bke.getCode());
-        }
-
-        return retVal;
-    }
-
-    abstract class Phase {
-
-        final Phase next;
-
-        Phase(Phase next) {
-            this.next = next;
-        }
-
-        abstract void process(int rc);
-    }
-
-    /**
-     * Schedule next readahead request. If we need to backoff, schedule in a backoff delay.
-     */
-    final class ScheduleReadAheadPhase extends Phase {
-
-        ScheduleReadAheadPhase() {
-            super(null);
-        }
-
-        @Override
-        void process(int rc) {
-            if (!running) {
-                setReadAheadStopped();
-                return;
-            }
-            tracker.enterPhase(ReadAheadPhase.SCHEDULE_READAHEAD);
-
-            boolean injectErrors = failureInjector.shouldInjectErrors();
-            if (encounteredException || injectErrors) {
-                int zkErrorThreshold = BKC_ZK_EXCEPTION_THRESHOLD_IN_SECONDS * 1000 * 4 / conf.getReadAheadWaitTime();
-
-                if ((bkcZkExceptions.get() > zkErrorThreshold) || injectErrors) {
-                    LOG.error("{} : BookKeeper Client used by the ReadAhead Thread has encountered {} zookeeper exceptions : simulate = {}",
-                              new Object[] { fullyQualifiedName, bkcZkExceptions.get(), injectErrors });
-                    running = false;
-                    setReadAheadError(tracker, new LogReadException(
-                            "Encountered too many zookeeper issues on read ahead for " + bkLedgerManager.getFullyQualifiedName()));
-                } else if (bkcUnExpectedExceptions.get() > BKC_UNEXPECTED_EXCEPTION_THRESHOLD) {
-                    LOG.error("{} : ReadAhead Thread has encountered {} unexpected BK exceptions.",
-                              fullyQualifiedName, bkcUnExpectedExceptions.get());
-                    running = false;
-                    setReadAheadError(tracker, new LogReadException(
-                            "Encountered too many unexpected bookkeeper issues on read ahead for " + bkLedgerManager.getFullyQualifiedName()));
-                } else {
-                    // We must always reinitialize metadata if the last attempt to read failed.
-                    reInitializeMetadata = true;
-                    encounteredException = false;
-                    // Backoff before resuming
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Scheduling read ahead for {} after {} ms.", fullyQualifiedName, conf.getReadAheadWaitTime() / 4);
-                    }
-                    schedule(ReadAheadWorker.this, conf.getReadAheadWaitTime() / 4);
-                }
-            } else {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Scheduling read ahead for {} now.", fullyQualifiedName);
-                }
-                submit(ReadAheadWorker.this);
-            }
-        }
-
-    }
-
-    /**
-     * Phase on handling exceptions.
-     */
-    final class ExceptionHandlePhase extends Phase {
-
-        ExceptionHandlePhase(Phase next) {
-            super(next);
-        }
-
-        @Override
-        void process(int rc) {
-            tracker.enterPhase(ReadAheadPhase.EXCEPTION_HANDLING);
-
-            if (BKException.Code.InterruptedException == rc) {
-                LOG.trace("ReadAhead Worker for {} is interrupted.", fullyQualifiedName);
-                running = false;
-                setReadAheadInterrupted(tracker);
-                return;
-            } else if (BKException.Code.ZKException == rc) {
-                encounteredException = true;
-                int numExceptions = bkcZkExceptions.incrementAndGet();
-                LOG.debug("ReadAhead Worker for {} encountered zookeeper exception : total exceptions are {}.",
-                        fullyQualifiedName, numExceptions);
-            } else if (BKException.Code.OK != rc) {
-                encounteredException = true;
-                switch(rc) {
-                    case BKException.Code.NoSuchEntryException:
-                    case BKException.Code.LedgerRecoveryException:
-                    case BKException.Code.NoSuchLedgerExistsException:
-                        break;
-                    default:
-                        bkcUnExpectedExceptions.incrementAndGet();
-                }
-                LOG.info("ReadAhead Worker for {} encountered exception : {}",
-                        fullyQualifiedName, BKException.create(rc));
-            }
-            // schedule next read ahead
-            next.process(BKException.Code.OK);
-        }
-    }
-
-    /**
-     * A phase that could be stopped by a stopPromise
-     */
-    final class StoppablePhase extends Phase {
-
-        StoppablePhase(Phase next) {
-            super(next);
-        }
-
-        @Override
-        void process(int rc) {
-            if (!running) {
-                setReadAheadStopped();
-                return;
-            }
-
-            if (null == stopPromise) {
-                stopPromise = new Promise<Void>();
-            }
-
-            // proceed the readahead request
-            next.process(BKException.Code.OK);
-        }
-    }
-
-    /**
-     * Phase on checking in progress changed.
-     */
-    final class CheckInProgressChangedPhase extends Phase
-            implements FutureEventListener<Versioned<List<LogSegmentMetadata>>> {
-
-        CheckInProgressChangedPhase(Phase next) {
-            super(next);
-        }
-
-        void processLogSegments(final List<LogSegmentMetadata> segments) {
-            // submit callback execution to dlg executor to avoid deadlock.
-            submit(new Runnable() {
-                @Override
-                public void run() {
-                    logSegmentList = segments;
-                    boolean isInitialPositioning = nextReadAheadPosition.definitelyLessThanOrEqualTo(startReadPosition);
-                    for (int i = 0; i < logSegmentList.size(); i++) {
-                        LogSegmentMetadata l = logSegmentList.get(i);
-                        // By default we should skip truncated segments during initial positioning
-                        if (l.isTruncated() &&
-                            isInitialPositioning &&
-                            !conf.getIgnoreTruncationStatus()) {
-                            continue;
-                        }
-
-                        DLSN nextReadDLSN = new DLSN(nextReadAheadPosition.getLogSegmentSequenceNumber(),
-                                nextReadAheadPosition.getEntryId(), -1);
-
-                        // next read position still inside a log segment
-                        final boolean hasDataToRead = (l.getLastDLSN().compareTo(nextReadDLSN) >= 0);
-
-                        // either there is data to read in current log segment or we are moving over a log segment that is
-                        // still inprogress or was inprogress, we have check (or maybe close) this log segment.
-                        final boolean checkOrCloseLedger = hasDataToRead ||
-                            // next read position move over a log segment, if l is still inprogress or it was inprogress
-                            ((l.isInProgress() || (null != currentMetadata && currentMetadata.isInProgress())) &&
-                                    l.getLogSegmentSequenceNumber() == nextReadAheadPosition.getLogSegmentSequenceNumber());
-
-                        // If we are positioning on a partially truncated log segment then the truncation point should
-                        // be before the nextReadPosition
-                        if (l.isPartiallyTruncated() &&
-                            !isInitialPositioning &&
-                            (l.getMinActiveDLSN().compareTo(nextReadDLSN) > 0)) {
-                            if (conf.getAlertWhenPositioningOnTruncated()) {
-                                alertStatsLogger.raise("Trying to position reader on {} when {} is marked partially truncated",
-                                    nextReadAheadPosition, l);
-                            }
-
-                            if (!conf.getIgnoreTruncationStatus()) {
-                                LOG.error("{}: Trying to position reader on {} when {} is marked partially truncated",
-                                    new Object[]{ logMetadata.getFullyQualifiedName(), nextReadAheadPosition, l});
-                                setReadingFromTruncated(tracker);
-                                return;
-                            }
-                        }
-
-
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("CheckLogSegment : newMetadata = {}, currentMetadata = {}, nextReadAheadPosition = {}",
-                                      new Object[] { l, currentMetadata, nextReadAheadPosition});
-                        }
-
-                        if (checkOrCloseLedger) {
-                            long startBKEntry = 0;
-                            if (l.isPartiallyTruncated() && !conf.getIgnoreTruncationStatus()) {
-                                startBKEntry = l.getMinActiveDLSN().getEntryId();
-                            }
-
-                            if(l.getLogSegmentSequenceNumber() == nextReadAheadPosition.getLogSegmentSequenceNumber()) {
-                                startBKEntry = Math.max(startBKEntry, nextReadAheadPosition.getEntryId());
-                                if (currentMetadata != null) {
-                                    inProgressChanged = currentMetadata.isInProgress() && !l.isInProgress();
-                                }
-                            } else {
-                                // We are positioning on a new ledger => reset the current ledger handle
-                                LOG.trace("Positioning {} on a new ledger {}", fullyQualifiedName, l);
-
-                                if (!closeCurrentLedgerHandle()) {
-                                    return;
-                                }
-                            }
-
-                            nextReadAheadPosition = new LedgerReadPosition(l.getLogSegmentId(), l.getLogSegmentSequenceNumber(), startBKEntry);
-                            if (conf.getTraceReadAheadMetadataChanges()) {
-                                LOG.info("Moved read position to {} for stream {} at {}.",
-                                         new Object[] {nextReadAheadPosition, logMetadata.getFullyQualifiedName(), System.currentTimeMillis() });
-                            }
-
-                            if (l.isTruncated()) {
-                                if (conf.getAlertWhenPositioningOnTruncated()) {
-                                    alertStatsLogger.raise("Trying to position reader on {} when {} is marked truncated",
-                                        nextReadAheadPosition, l);
-                                }
-
-                                if (!conf.getIgnoreTruncationStatus()) {
-                                    LOG.error("{}: Trying to position reader on {} when {} is marked truncated",
-                                        new Object[]{ logMetadata.getFullyQualifiedName(), nextReadAheadPosition, l});
-                                    setReadingFromTruncated(tracker);
-                                    return;
-                                }
-                            }
-
-                            currentMetadata = l;
-                            currentMetadataIndex = i;
-                            break;
-                        }
-
-                        // Handle multiple in progress => stop at the first in progress
-                        if (l.isInProgress()) {
-                            break;
-                        }
-                    }
-
-                    if (null == currentMetadata) {
-                        if (isCatchingUp) {
-                            isCatchingUp = false;
-                            if (isHandleForReading) {
-                                LOG.info("{} caught up at {}: position = {} and no log segment to position on at this point.",
-                                         new Object[] { fullyQualifiedName, System.currentTimeMillis(), nextReadAheadPosition });
-                            }
-                        }
-                        schedule(ReadAheadWorker.this, conf.getReadAheadWaitTimeOnEndOfStream());
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("No log segment to position on for {}. Backing off for {} millseconds",
-                                    fullyQualifiedName, conf.getReadAheadWaitTimeOnEndOfStream());
-                        }
-                    } else  {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Initialized metadata for {}, starting reading ahead from {} : {}.",
-                                    new Object[] { fullyQualifiedName, currentMetadataIndex, currentMetadata });
-                        }
-                        next.process(BKException.Code.OK);
-                    }
-
-                    // Once we have read the ledger list for the first time, subsequent segments
-                    // should be read in a streaming manner and we should get the new ledgers as
-                    // they close in ZK through watchers.
-                    // So lets start observing the latency
-                    bkLedgerManager.reportGetSegmentStats(true);
-                }
-            });
-        }
-
-        @Override
-        void process(int rc) {
-            if (!running) {
-                setReadAheadStopped();
-                return;
-            }
-
-            tracker.enterPhase(ReadAheadPhase.GET_LEDGERS);
-
-            inProgressChanged = false;
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Checking {} if InProgress changed.", fullyQualifiedName);
-            }
-
-            if (reInitializeMetadata || null == currentMetadata) {
-                reInitializeMetadata = false;
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Reinitializing metadata for {}.", fullyQualifiedName);
-                }
-                if (metadataNotificationTimeMillis > 0) {
-                    long metadataReinitializeTimeMillis = System.currentTimeMillis();
-                    long elapsedMillisSinceMetadataChanged = metadataReinitializeTimeMillis - metadataNotificationTimeMillis;
-                    if (elapsedMillisSinceMetadataChanged >= metadataLatencyWarnThresholdMillis) {
-                        LOG.warn("{} reinitialize metadata at {}, which is {} millis after receiving notification at {}.",
-                                 new Object[] { logMetadata.getFullyQualifiedName(), metadataReinitializeTimeMillis,
-                                                elapsedMillisSinceMetadataChanged, metadataNotificationTimeMillis});
-                    }
-                    metadataReinitializationStat.registerSuccessfulEvent(
-                            TimeUnit.MILLISECONDS.toMicros(elapsedMillisSinceMetadataChanged));
-                    metadataNotificationTimeMillis = -1L;
-                }
-                if (forceReadLogSegments) {
-                    forceReadLogSegments = false;
-                    bkLedgerManager.readLogSegmentsFromStore(
-                            LogSegmentMetadata.COMPARATOR,
-                            LogSegmentFilter.DEFAULT_FILTER,
-                            null
-                    ).addEventListener(this);
-                } else {
-                    processLogSegments(logSegmentListNotified);
-                }
-            } else {
-                next.process(BKException.Code.OK);
-            }
-        }
-
-        @Override
-        public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) {
-            processLogSegments(segments.getValue());
-        }
-
-        @Override
-        public void onFailure(Throwable cause) {
-            LOG.info("Encountered metadata exception while reading log segments of {} : {}. Retrying ...",
-                    bkLedgerManager.getFullyQualifiedName(), cause.getMessage());
-            reInitializeMetadata = true;
-            forceReadLogSegments = true;
-            handleException(ReadAheadPhase.GET_LEDGERS, BKException.Code.ZKException);
-        }
-    }
-
-    final class OpenLedgerPhase extends Phase
-            implements BookkeeperInternalCallbacks.GenericCallback<LedgerDescriptor>,
-                        AsyncCallback.ReadLastConfirmedAndEntryCallback {
-
-        OpenLedgerPhase(Phase next) {
-            super(next);
-        }
-
-        private void issueReadLastConfirmedAndEntry(final boolean parallel,
-                                                    final long lastAddConfirmed) {
-            final String ctx = String.format("ReadLastConfirmedAndEntry(%s, %d)", parallel? "Parallel":"Sequential", lastAddConfirmed);
-            final ReadLastConfirmedAndEntryCallbackWithNotification callback =
-                new ReadLastConfirmedAndEntryCallbackWithNotification(lastAddConfirmed, this, ctx);
-            boolean callbackImmediately = setMetadataNotification(callback);
-            handleCache.asyncReadLastConfirmedAndEntry(
-                    currentLH,
-                    nextReadAheadPosition.getEntryId(),
-                    conf.getReadLACLongPollTimeout(),
-                    parallel
-            ).addEventListener(new FutureEventListener<Pair<Long, LedgerEntry>>() {
-                @Override
-                public void onSuccess(Pair<Long, LedgerEntry> lacAndEntry) {
-                    callback.readLastConfirmedAndEntryComplete(
-                            BKException.Code.OK,
-                            lacAndEntry.getLeft(),
-                            lacAndEntry.getRight(),
-                            ctx);
-                }
-
-                @Override
-                public void onFailure(Throwable cause) {
-                    callback.readLastConfirmedAndEntryComplete(
-                            FutureUtils.bkResultCode(cause),
-                            lastAddConfirmed,
-                            null,
-                            ctx);
-                }
-            });
-            callback.callbackImmediately(callbackImmediately);
-            readAheadReadLACAndEntryCounter.inc();
-        }
-
-        @Override
-        void process(int rc) {
-            if (!running) {
-                setReadAheadStopped();
-                return;
-            }
-
-            tracker.enterPhase(ReadAheadPhase.OPEN_LEDGER);
-
-            if (currentMetadata.isInProgress()) { // we don't want to fence the current journal
-                if (null == currentLH) {
-                    if (conf.getTraceReadAheadMetadataChanges()) {
-                        LOG.info("Opening inprogress ledger of {} for {} at {}.",
-                                 new Object[] { currentMetadata, fullyQualifiedName, System.currentTimeMillis() });
-                    }
-                    handleCache.asyncOpenLedger(currentMetadata, false)
-                            .addEventListener(new FutureEventListener<LedgerDescriptor>() {
-                                @Override
-                                public void onSuccess(LedgerDescriptor ld) {
-                                    operationComplete(BKException.Code.OK, ld);
-                                }
-
-                                @Override
-                                public void onFailure(Throwable cause) {
-                                    operationComplete(FutureUtils.bkResultCode(cause), null);
-                                }
-                            });
-                } else {
-                    final long lastAddConfirmed;
-                    try {
-                        lastAddConfirmed = handleCache.getLastAddConfirmed(currentLH);
-                    } catch (BKException ie) {
-                        // Exception is thrown due to no ledger handle
-                        handleException(ReadAheadPhase.OPEN_LEDGER, ie.getCode());
-                        return;
-                    }
-
-                    if (lastAddConfirmed < nextReadAheadPosition.getEntryId()) {
-                        // This indicates that the currentMetadata is still marked in
-                        // progress while we have already read all the entries. It might
-                        // indicate a failure to detect metadata change. So we
-                        // should probably try force read log segments if the reader has
-                        // been idle for after a while.
-                        if (LACNotAdvancedStopWatch.isRunning()) {
-                            if (LACNotAdvancedStopWatch.elapsed(TimeUnit.MILLISECONDS)
-                                > conf.getReaderIdleWarnThresholdMillis()) {
-                                idleReaderWarn.inc();
-                                LOG.info("{} Ledger {} for inprogress segment {}, reader has been idle for warn threshold {}",
-                                    new Object[] { fullyQualifiedName, currentMetadata, currentLH, conf.getReaderIdleWarnThresholdMillis() });
-                                reInitializeMetadata = true;
-                                forceReadLogSegments = true;
-                            }
-                        } else {
-                            LACNotAdvancedStopWatch.reset().start();
-                            if (conf.getTraceReadAheadMetadataChanges()) {
-                                LOG.info("{} Ledger {} for inprogress segment {} closed",
-                                    new Object[] { fullyQualifiedName, currentMetadata, currentLH });
-                            }
-                        }
-
-                        tracker.enterPhase(ReadAheadPhase.READ_LAST_CONFIRMED);
-
-                        // the readahead is caught up if current ledger is in progress and read position moves over last add confirmed
-                        if (isCatchingUp) {
-                            isCatchingUp = false;
-                            if (isHandleForReading) {
-                                LOG.info("{} caught up at {}: lac = {}, position = {}.",
-                                         new Object[] { fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition });
-                            }
-                        }
-
-                        LOG.trace("Reading last add confirmed of {} for {}, as read poistion has moved over {} : {}",
-                                new Object[] { currentMetadata, fullyQualifiedName, lastAddConfirmed, nextReadAheadPosition });
-
-                        if (nextReadAheadPosition.getEntryId() == 0 && conf.getTraceReadAheadMetadataChanges()) {
-                            // we are waiting for first entry to arrive
-                            LOG.info("Reading last add confirmed for {} at {}: lac = {}, position = {}.",
-                                     new Object[] { fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition});
-                        } else {
-                            LOG.trace("Reading last add confirmed for {} at {}: lac = {}, position = {}.",
-                                      new Object[] { fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition});
-                        }
-                        issueReadLastConfirmedAndEntry(false, lastAddConfirmed);
-                    } else {
-                        next.process(BKException.Code.OK);
-                    }
-                }
-            } else {
-                LACNotAdvancedStopWatch.reset();
-                if (null != currentLH) {
-                    try {
-                        if (inProgressChanged) {
-                            LOG.trace("Closing completed ledger of {} for {}.", currentMetadata, fullyQualifiedName);
-                            if (!closeCurrentLedgerHandle()) {
-                                return;
-                            }
-                            inProgressChanged = false;
-                        } else {
-                            long lastAddConfirmed = handleCache.getLastAddConfirmed(currentLH);
-                            if (nextReadAheadPosition.getEntryId() > lastAddConfirmed) {
-                                // Its possible that the last entryId does not account for the control
-                                // log record, but the lastAddConfirmed should never be short of the
-                                // last entry id; else we maybe missing an entry
-                                boolean gapDetected = false;
-                                if (lastAddConfirmed < currentMetadata.getLastEntryId()) {
-                                    alertStatsLogger.raise("Unexpected last entry id during read ahead; {} , {}",
-                                        currentMetadata, lastAddConfirmed);
-                                    gapDetected = true;
-                                }
-
-                                if (conf.getPositionGapDetectionEnabled() && gapDetected) {
-                                    setReadAheadError(tracker, new UnexpectedException(
-                                            "Unexpected last entry id during read ahead : " + currentMetadata
-                                                    + ", lac = " + lastAddConfirmed));
-                                } else {
-                                    // This disconnect will only surface during repositioning and
-                                    // will not silently miss records; therefore its safe to not halt
-                                    // reading, but we should print a warning for easy diagnosis
-                                    if (conf.getTraceReadAheadMetadataChanges() && lastAddConfirmed > (currentMetadata.getLastEntryId() + 1)) {
-                                        LOG.warn("Potential Metadata Corruption {} for stream {}, lastAddConfirmed {}",
-                                                new Object[] {currentMetadata, logMetadata.getFullyQualifiedName(), lastAddConfirmed});
-                                    }
-
-                                    LOG.trace("Past the last Add Confirmed {} in ledger {} for {}",
-                                              new Object[] { lastAddConfirmed, currentMetadata, fullyQualifiedName });
-                                    if (!closeCurrentLedgerHandle()) {
-                                        return;
-                                    }
-                                    LogSegmentMetadata oldMetadata = currentMetadata;
-                                    currentMetadata = null;
-                                    if (currentMetadataIndex + 1 < logSegmentList.size()) {
-                                        currentMetadata = logSegmentList.get(++currentMetadataIndex);
-                                        if (currentMetadata.getLogSegmentSequenceNumber() != (oldMetadata.getLogSegmentSequenceNumber() + 1)) {
-                                            // We should never get here as we should have exited the loop if
-                                            // pendingRequests were empty
-                                            alertStatsLogger.raise("Unexpected condition during read ahead; {} , {}",
-                                                currentMetadata, oldMetadata);
-                                            setReadAheadError(tracker, new UnexpectedException(
-                                                    "Unexpected condition during read ahead : current metadata "
-                                                            + currentMetadata + ", old metadata " + oldMetadata));
-                                        } else {
-                                            if (currentMetadata.isTruncated()) {
-                                                if (conf.getAlertWhenPositioningOnTruncated()) {
-                                                    alertStatsLogger.raise("Trying to position reader on the log segment that is marked truncated : {}",
-                                                        currentMetadata);
-                                                }
-
-                                                if (!conf.getIgnoreTruncationStatus()) {
-                                                    LOG.error("{}: Trying to position reader on the log segment that is marked truncated : {}",
-                                                            logMetadata.getFullyQualifiedName(), currentMetadata);
-                                                    setReadingFromTruncated(tracker);
-                                                }
-                                            } else {
-                                                if (LOG.isTraceEnabled()) {
-                                                    LOG.trace("Moving read position to a new ledger {} for {}.",
-                                                        currentMetadata, fullyQualifiedName);
-                                                }
-                                                nextReadAheadPosition.positionOnNewLogSegment(currentMetadata.getLogSegmentId(), currentMetadata.getLogSegmentSequenceNumber());
-                                            }
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                        if (!readAheadError) {
-                            next.process(BKException.Code.OK);
-                        }
-                    } catch (BKException bke) {
-                        LOG.debug("Exception while repositioning", bke);
-                        handleException(ReadAheadPhase.CLOSE_LEDGER, bke.getCode());
-                    }
-                } else {
-                    LOG.trace("Opening completed ledger of {} for {}.", currentMetadata, fullyQualifiedName);
-                    handleCache.asyncOpenLedger(currentMetadata, true)
-                            .addEventListener(new FutureEventListener<LedgerDescriptor>() {
-                                @Override
-                                public void onSuccess(LedgerDescriptor ld) {
-                                    operationComplete(BKException.Code.OK, ld);
-                                }
-
-                                @Override
-                                public void onFailure(Throwable cause) {
-                                    operationComplete(FutureUtils.bkResultCode(cause), null);
-                                }
-                            });
-                }
-            }
-
-        }
-
-        @Override
-        public void operationComplete(final int rc, final LedgerDescriptor result) {
-            // submit callback execution to dlg executor to avoid deadlock.
-            submit(new Runnable() {
-                @Override
-                public void run() {
-                    if (BKException.Code.OK != rc) {
-                        LOG.debug("BK Exception {} while opening ledger", rc);
-                        handleException(ReadAheadPhase.OPEN_LEDGER, rc);
-                        return;
-                    }
-                    currentLH = result;
-                    if (conf.getTraceReadAheadMetadataChanges()) {
-                        LOG.info("Opened ledger of {} for {} at {}.",
-                                new Object[]{currentMetadata, fullyQualifiedName, System.currentTimeMillis()});
-                    }
-                    bkcZkExceptions.set(0);
-                    bkcUnExpectedExceptions.set(0);
-                    bkcNoLedgerExceptionsOnReadLAC.set(0);
-                    next.process(rc);
-                }
-            });
-        }
-
-        /**
-         * Handle the result of reading last add confirmed.
-         *
-         * @param rc
-         *          result of reading last add confirmed
-         */
-        private void handleReadLastConfirmedError(int rc) {
-            if (BKException.Code.NoSuchLedgerExistsException == rc) {
-                if (bkcNoLedgerExceptionsOnReadLAC.incrementAndGet() > noLedgerExceptionOnReadLACThreshold) {
-                    LOG.info("{} No entries published to ledger {} yet for {} millis.",
-                            new Object[] { fullyQualifiedName, currentLH, conf.getReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis() });
-                    bkcNoLedgerExceptionsOnReadLAC.set(0);
-                    // set current ledger handle to null, so it would be re-opened again.
-                    // if the ledger does disappear, it would trigger re-initialize log segments on handling openLedger exceptions
-                    if (closeCurrentLedgerHandle()) {
-                        next.process(BKException.Code.OK);
-                    }
-                    return;
-                } else {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.info("{} No entries published to ledger {} yet. Backoff reading ahead for {} ms.",
-                                new Object[]{fullyQualifiedName, currentLH, conf.getReadAheadWaitTime()});
-                    }
-                    // Backoff before resuming
-                    schedule(ReadAheadWorker.this, conf.getReadAheadWaitTime());
-                    return;
-                }
-            } else if (BKException.Code.OK != rc) {
-                handleException(ReadAheadPhase.READ_LAST_CONFIRMED, rc);
-                return;
-            }
-        }
-
-        public void readLastConfirmedAndEntryComplete(final int rc, final long lastConfirmed, final LedgerEntry entry,
-                                                      final Object ctx) {
-            // submit callback execution to dlg executor to avoid deadlock.
-            submit(new Runnable() {
-                @Override
-                public void run() {
-                    if (BKException.Code.OK != rc) {
-                        handleReadLastConfirmedError(rc);
-                        return;
-                    }
-                    bkcZkExceptions.set(0);
-                    bkcUnExpectedExceptions.set(0);
-                    bkcNoLedgerExceptionsOnReadLAC.set(0);
-                    if (LOG.isTraceEnabled()) {
-                        try {
-                            LOG.trace("Advancing Last Add Confirmed of {} for {} : {}, {}",
-                                      new Object[] { currentMetadata, fullyQualifiedName, lastConfirmed,
-                                                     handleCache.getLastAddConfirmed(currentLH) });
-                        } catch (BKException exc) {
-                            // Ignore
-                        }
-                    }
-
-                    if ((null != entry)
-                        && (nextReadAheadPosition.getEntryId() == entry.getEntryId())
-                        && (nextReadAheadPosition.getLedgerId() == entry.getLedgerId())) {
-                        if (lastConfirmed <= 4 && conf.getTraceReadAheadMetadataChanges()) {
-                            LOG.info("Hit readLastConfirmedAndEntry for {} at {} : entry = {}, lac = {}, position = {}.",
-                                    new Object[] { fullyQualifiedName, System.currentTimeMillis(),
-                                            entry.getEntryId(), lastConfirmed, nextReadAheadPosition });
-                        }
-
-                        if (!isCatchingUp) {
-                            long lac = lastConfirmed - nextReadAheadPosition.getEntryId();
-                            if (lac > 0) {
-                                readAheadLacLagStats.registerSuccessfulEvent(lac);
-                            }
-                        }
-
-                        nextReadAheadPosition.advance();
-
-                        readAheadCache.set(new LedgerReadPosition(entry.getLedgerId(), currentLH.getLogSegmentSequenceNo(), entry.getEntryId()),
-                                               entry, null != ctx ? ctx.toString() : "",
-                                               currentMetadata.getEnvelopeEntries(), currentMetadata.getStartSequenceId());
-
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Reading the value received {} for {} : entryId {}",
-                                new Object[] { currentMetadata, fullyQualifiedName, entry.getEntryId() });
-                        }
-                        readAheadEntryPiggyBackHits.inc();
-                    } else {
-                        if (lastConfirmed > nextReadAheadPosition.getEntryId()) {
-                            LOG.info("{} : entry {} isn't piggybacked but last add confirmed already moves to {}.",
-                                     new Object[] { logMetadata.getFullyQualifiedName(), nextReadAheadPosition.getEntryId(), lastConfirmed });
-                        }
-                        readAheadEntryPiggyBackMisses.inc();
-                    }
-                    next.process(rc);
-                }
-            });
-        }
-    }
-
-    final class ReadEntriesPhase extends Phase implements Runnable {
-
-        boolean cacheFull = false;
-        long lastAddConfirmed = -1;
-
-        ReadEntriesPhase(Phase next) {
-            super(next);
-        }
-
-        @Override
-        void process(int rc) {
-            if (!running) {
-                setReadAheadStopped();
-                return;
-            }
-
-            tracker.enterPhase(ReadAheadPhase.READ_ENTRIES);
-
-            cacheFull = false;
-            lastAddConfirmed = -1;
-            if (null != currentLH) {
-                try {
-                    lastAddConfirmed = handleCache.getLastAddConfirmed(currentLH);
-                } catch (BKException bke) {
-                    handleException(ReadAheadPhase.READ_LAST_CONFIRMED, bke.getCode());
-                    return;
-                }
-                read();
-            } else {
-                complete();
-            }
-        }
-
-        private void read() {
-            if (lastAddConfirmed < nextReadAheadPosition.getEntryId()) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Nothing to read for {} of {} : lastAddConfirmed = {}, nextReadAheadPosition = {}",
-                            new Object[] { currentMetadata, fullyQualifiedName, lastAddConfirmed, nextReadAheadPosition});
-                }
-                complete();
-                return;
-            }
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Reading entry {} for {} of {}.",
-                        new Object[] {nextReadAheadPosition, currentMetadata, fullyQualifiedName });
-            }
-            int readAheadBatchSize = dynConf.getReadAheadBatchSize();
-            final long startEntryId = nextReadAheadPosition.getEntryId();
-            final long endEntryId = Math.min(lastAddConfirmed, (nextReadAheadPosition.getEntryId() + readAheadBatchSize - 1));
-
-            if (endEntryId <= readAheadBatchSize && conf.getTraceReadAheadMetadataChanges()) {
-                // trace first read batch
-                LOG.info("Reading entries ({} - {}) for {} at {} : lac = {}, nextReadAheadPosition = {}.",
-                         new Object[] { startEntryId, endEntryId, fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition});
-            }
-
-            final String readCtx = String.format("ReadEntries(%d-%d)", startEntryId, endEntryId);
-            handleCache.asyncReadEntries(currentLH, startEntryId, endEntryId)
-                    .addEventListener(new FutureEventListener<Enumeration<LedgerEntry>>() {
-
-                        @Override
-                        public void onSuccess(Enumeration<LedgerEntry> entries) {
-                            int rc = BKException.Code.OK;
-
-                            if (failureInjector.shouldInjectCorruption(startEntryId, endEntryId)) {
-                                rc = BKException.Code.DigestMatchException;
-                            }
-                            readComplete(rc, null, entries, readCtx, startEntryId, endEntryId);
-                        }
-
-                        @Override
-                        public void onFailure(Throwable cause) {
-                            readComplete(FutureUtils.bkResultCode(cause), null, null, readCtx, startEntryId, endEntryId);
-                        }
-                    });
-        }
-
-        public void readComplete(final int rc, final LedgerHandle lh,
-                                 final Enumeration<LedgerEntry> seq, final Object ctx,
-                                 final long startEntryId, final long endEntryId) {
-            // submit callback execution to dlg executor to avoid deadlock.
-            submit(new Runnable() {
-                @Override
-                public void run() {
-                    // If readAheadSkipBrokenEntries is enabled and we hit a corrupt entry, log and
-                    // stat the issue and move forward.
-                    if (BKException.Code.DigestMatchException == rc && readAheadSkipBrokenEntries) {
-                        readAheadReadEntriesStat.registerFailedEvent(0);
-                        LOG.error("BK DigestMatchException while reading entries {}-{} in stream {}, entry {} discarded",
-                                new Object[] { startEntryId, endEntryId, fullyQualifiedName, startEntryId });
-                        bkcZkExceptions.set(0);
-                        bkcUnExpectedExceptions.set(0);
-                        readAheadSkippedBrokenEntries.inc();
-                        nextReadAheadPosition.advance();
-                    } else if (BKException.Code.OK != rc) {
-                        readAheadReadEntriesStat.registerFailedEvent(0);
-                        LOG.debug("BK Exception {} while reading entry", rc);
-                        handleException(ReadAheadPhase.READ_ENTRIES, rc);
-                        return;
-                    } else {
-                        int numReads = 0;
-                        while (seq.hasMoreElements()) {
-                            bkcZkExceptions.set(0);
-                            bkcUnExpectedExceptions.set(0);
-                            nextReadAheadPosition.advance();
-                            LedgerEntry e = seq.nextElement();
-                            LedgerReadPosition readPosition = new LedgerReadPosition(e.getLedgerId(), currentMetadata.getLogSegmentSequenceNumber(), e.getEntryId());
-                            readAheadCache.set(readPosition, e, null != ctx ? ctx.toString() : "",
-                                    currentMetadata.getEnvelopeEntries(), currentMetadata.getStartSequenceId());
-                            ++numReads;
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Read entry {} of {}.", readPosition, fullyQualifiedName);
-                            }
-                        }
-                        readAheadReadEntriesStat.registerSuccessfulEvent(numReads);
-                    }
-                    if (readAheadCache.isCacheFull()) {
-                        cacheFull = true;
-                        complete();
-                    } else {
-                        read();
-                    }
-                }
-            });
-        }
-
-        private void complete() {
-            if (cacheFull) {
-                LOG.trace("Cache for {} is full. Backoff reading until notified", fullyQualifiedName);
-                readAheadCacheFullCounter.inc();
-                resumeStopWatch.reset().start();
-                stopPromise = null;
-                readAheadCache.setReadAheadCallback(ReadAheadWorker.this);
-            } else {
-                run();
-            }
-        }
-
-        @Override
-        public void run() {
-            next.process(BKException.Code.OK);
-        }
-    }
-
-    @Override
-    public void run() {
-        if (!running) {
-            setReadAheadStopped();
-            return;
-        }
-        readAheadPhase.process(BKException.Code.OK);
-    }
-
-    @Override
-    public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
-        AsyncNotification notification;
-        synchronized (notificationLock) {
-            logSegmentListNotified = segments;
-            reInitializeMetadata = true;
-            LOG.debug("{} Read ahead node changed", fullyQualifiedName);
-            notification = metadataNotification;
-            metadataNotification = null;
-        }
-        metadataNotificationTimeMillis = System.currentTimeMillis();
-        if (null != notification) {
-            notification.notifyOnOperationComplete();
-        }
-    }
-
-    @Override
-    public void onLogStreamDeleted() {
-        logDeleted = true;
-        setReadAheadError(tracker, new LogNotFoundException("Log stream "
-                + bkLedgerManager.getFullyQualifiedName() + " is deleted."));
-    }
-
-    /**
-     * Set metadata notification and return the flag indicating whether to reinitialize metadata.
-     *
-     * @param notification
-     *          metadata notification
-     * @return flag indicating whether to reinitialize metadata.
-     */
-    private boolean setMetadataNotification(AsyncNotification notification) {
-        synchronized (notificationLock) {
-            this.metadataNotification = notification;
-            return reInitializeMetadata;
-        }
-    }
-
-    @VisibleForTesting
-    public AsyncNotification getMetadataNotification() {
-        synchronized (notificationLock) {
-            return metadataNotification;
-        }
-    }
-
-    /**
-     * A scheduled runnable that could be waken and executed immediately when notification arrives.
-     *
-     * E.g
-     * <p>
-     * The reader reaches end of stream, it backs off to schedule next read in 2 seconds.
-     * <br/>
-     * if a new log segment is created, without this change, reader has to wait 2 seconds to read
-     * entries in new log segment, which means delivery latency of entries in new log segment could
-     * be up to 2 seconds. but with this change, the task would be executed immediately, which reader
-     * would be waken up from backoff, which would reduce the delivery latency.
-     * </p>
-     */
-    class InterruptibleScheduledRunnable implements AsyncNotification, Runnable {
-
-        final Runnable task;
-        final AtomicBoolean called = new AtomicBoolean(false);
-        final long startNanos;
-
-        InterruptibleScheduledRunnable(Runnable task) {
-            this.task = task;
-            this.startNanos = MathUtils.nowInNano();
-        }
-
-        @Override
-        public void notifyOnError(Throwable t) {
-            longPollInterruptionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startNanos));
-            execute();
-        }
-
-        @Override
-        public void notifyOnOperationComplete() {
-            longPollInterruptionStat.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startNanos));
-            execute();
-        }
-
-        @Override
-        public void run() {
-            if (called.compareAndSet(false, true)) {
-                task.run();
-            }
-        }
-
-        void execute() {
-            if (called.compareAndSet(false, true)) {
-                submit(task);
-            }
-        }
-    }
-
-    abstract class LongPollNotification<T> implements AsyncNotification {
-
-        final long lac;
-        final T cb;
-        final Object ctx;
-        final AtomicBoolean called = new AtomicBoolean(false);
-        final long startNanos;
-
-        LongPollNotification(long lac, T cb, Object ctx) {
-            this.lac = lac;
-            this.cb = cb;
-            this.ctx = ctx;
-            this.startNanos = MathUtils.nowInNano();
-        }
-
-        void complete(boolean success) {
-            long startTime = MathUtils.nowInNano();
-            doComplete(success);
-            if (success) {
-                notificationExecutionStat.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startTime));
-            } else {
-                notificationExecutionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startTime));
-            }
-        }
-
-        abstract void doComplete(boolean success);
-
-        @Override
-        public void notifyOnError(Throwable cause) {
-            longPollInterruptionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startNanos));
-            complete(false);
-        }
-
-        @Override
-        public void notifyOnOperationComplete() {
-            longPollInterruptionStat.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startNanos));
-            complete(true);
-        }
-
-        void callbackImmediately(boolean immediate) {
-            if (immediate) {
-                complete(true);
-            }
-        }
-    }
-
-    class ReadLastConfirmedAndEntryCallbackWithNotification
-            extends LongPollNotification<AsyncCallback.ReadLastConfirmedAndEntryCallback>
-            implements AsyncCallback.ReadLastConfirmedAndEntryCallback {
-
-        ReadLastConfirmedAndEntryCallbackWithNotification(
-                long lac, AsyncCallback.ReadLastConfirmedAndEntryCallback cb, Object ctx) {
-            super(lac, cb, ctx);
-        }
-
-        @Override
-        public void readLastConfirmedAndEntryComplete(int rc, long lac, LedgerEntry entry, Object ctx) {
-            if (called.compareAndSet(false, true)) {
-                // clear the notification when callback
-                synchronized (notificationLock) {
-                    metadataNotification = null;
-                }
-                this.cb.readLastConfirmedAndEntryComplete(rc, lac, entry, ctx);
-            }
-        }
-
-        @Override
-        void doComplete(boolean success) {
-            readLastConfirmedAndEntryComplete(BKException.Code.OK, lac, null, ctx);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/ReadAheadExceptionsLogger.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/ReadAheadExceptionsLogger.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/ReadAheadExceptionsLogger.java
deleted file mode 100644
index 326f92b..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/ReadAheadExceptionsLogger.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.stats;
-
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Stats logger to log exceptions happened in {@link com.twitter.distributedlog.readahead.ReadAheadWorker}.
- * They are counters of exceptions happened on each read ahead phase:
- * <code>`scope`/exceptions/`phase`/`code`</code>. `scope` is the current scope of
- * stats logger, `phase` is the read ahead phase, while `code` is the exception code. Check
- * {@link com.twitter.distributedlog.readahead.ReadAheadPhase} for details about phases and
- * {@link BKExceptionStatsLogger} for details about `code`.
- */
-public class ReadAheadExceptionsLogger {
-
-    private final StatsLogger statsLogger;
-    private StatsLogger parentExceptionStatsLogger;
-    private final ConcurrentMap<String, BKExceptionStatsLogger> exceptionStatsLoggers =
-            new ConcurrentHashMap<String, BKExceptionStatsLogger>();
-
-    public ReadAheadExceptionsLogger(StatsLogger statsLogger) {
-        this.statsLogger = statsLogger;
-    }
-
-    public BKExceptionStatsLogger getBKExceptionStatsLogger(String phase) {
-        // initialize the parent exception stats logger lazily
-        if (null == parentExceptionStatsLogger) {
-            parentExceptionStatsLogger = statsLogger.scope("exceptions");
-        }
-        BKExceptionStatsLogger exceptionStatsLogger = exceptionStatsLoggers.get(phase);
-        if (null == exceptionStatsLogger) {
-            exceptionStatsLogger = new BKExceptionStatsLogger(parentExceptionStatsLogger.scope(phase));
-            BKExceptionStatsLogger oldExceptionStatsLogger =
-                    exceptionStatsLoggers.putIfAbsent(phase, exceptionStatsLogger);
-            if (null != oldExceptionStatsLogger) {
-                exceptionStatsLogger = oldExceptionStatsLogger;
-            }
-        }
-        return exceptionStatsLogger;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLedgerHandleCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLedgerHandleCache.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLedgerHandleCache.java
deleted file mode 100644
index 1829e54..0000000
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLedgerHandleCache.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.util.FutureUtils;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.*;
-
-/**
- * Test {@link LedgerHandleCache}
- */
-public class TestLedgerHandleCache extends TestDistributedLogBase {
-    static final Logger LOG = LoggerFactory.getLogger(TestLedgerHandleCache.class);
-
-    protected static String ledgersPath = "/ledgers";
-
-    private ZooKeeperClient zkc;
-    private BookKeeperClient bkc;
-
-    @Before
-    public void setup() throws Exception {
-        zkc = TestZooKeeperClientBuilder.newBuilder()
-                .zkServers(zkServers)
-                .build();
-        bkc = BookKeeperClientBuilder.newBuilder()
-                .name("bkc")
-                .zkc(zkc)
-                .ledgersPath(ledgersPath)
-                .dlConfig(conf)
-                .build();
-    }
-
-    @After
-    public void teardown() throws Exception {
-        bkc.close();
-        zkc.close();
-    }
-
-    @Test(timeout = 60000, expected = NullPointerException.class)
-    public void testBuilderWithoutBKC() throws Exception {
-        LedgerHandleCache.newBuilder().build();
-    }
-
-    @Test(timeout = 60000, expected = NullPointerException.class)
-    public void testBuilderWithoutStatsLogger() throws Exception {
-        LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).statsLogger(null).build();
-    }
-
-    @Test(timeout = 60000, expected = BKException.BKBookieHandleNotAvailableException.class)
-    public void testOpenLedgerWhenBkcClosed() throws Exception {
-        BookKeeperClient newBkc = BookKeeperClientBuilder.newBuilder().name("newBkc")
-                .zkc(zkc).ledgersPath(ledgersPath).dlConfig(conf).build();
-        LedgerHandleCache cache =
-                LedgerHandleCache.newBuilder().bkc(newBkc).conf(conf).build();
-        // closed the bkc
-        newBkc.close();
-        // open ledger after bkc closed.
-        cache.openLedger(new LogSegmentMetadata.LogSegmentMetadataBuilder("", 2, 1, 1).setRegionId(1).build(), false);
-    }
-
-    @Test(timeout = 60000, expected = BKException.ZKException.class)
-    public void testOpenLedgerWhenZkClosed() throws Exception {
-        ZooKeeperClient newZkc = TestZooKeeperClientBuilder.newBuilder()
-                .name("zkc-openledger-when-zk-closed")
-                .zkServers(zkServers)
-                .build();
-        BookKeeperClient newBkc = BookKeeperClientBuilder.newBuilder()
-                .name("bkc-openledger-when-zk-closed")
-                .zkc(newZkc)
-                .ledgersPath(ledgersPath)
-                .dlConfig(conf)
-                .build();
-        try {
-            LedgerHandle lh = newBkc.get().createLedger(BookKeeper.DigestType.CRC32, "zkcClosed".getBytes(UTF_8));
-            lh.close();
-            newZkc.close();
-            LedgerHandleCache cache =
-                    LedgerHandleCache.newBuilder().bkc(newBkc).conf(conf).build();
-            // open ledger after zkc closed
-            cache.openLedger(new LogSegmentMetadata.LogSegmentMetadataBuilder("",
-                    2, lh.getId(), 1).setLogSegmentSequenceNo(lh.getId()).build(), false);
-        } finally {
-            newBkc.close();
-        }
-    }
-
-    @Test(timeout = 60000, expected = BKException.BKUnexpectedConditionException.class)
-    public void testReadLastConfirmedWithoutOpeningLedger() throws Exception {
-        LedgerDescriptor desc = new LedgerDescriptor(9999, 9999, false);
-        LedgerHandleCache cache =
-                LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build();
-        // read last confirmed
-        cache.tryReadLastConfirmed(desc);
-    }
-
-    @Test(timeout = 60000, expected = BKException.BKUnexpectedConditionException.class)
-    public void testReadEntriesWithoutOpeningLedger() throws Exception {
-        LedgerDescriptor desc = new LedgerDescriptor(9999, 9999, false);
-        LedgerHandleCache cache =
-                LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build();
-        // read entries
-        cache.readEntries(desc, 0, 10);
-    }
-
-    @Test(timeout = 60000, expected = BKException.BKUnexpectedConditionException.class)
-    public void testGetLastConfirmedWithoutOpeningLedger() throws Exception {
-        LedgerDescriptor desc = new LedgerDescriptor(9999, 9999, false);
-        LedgerHandleCache cache =
-                LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build();
-        // read entries
-        cache.getLastAddConfirmed(desc);
-    }
-
-    @Test(timeout = 60000, expected = BKException.BKUnexpectedConditionException.class)
-    public void testReadLastConfirmedAndEntryWithoutOpeningLedger() throws Exception {
-        LedgerDescriptor desc = new LedgerDescriptor(9999, 9999, false);
-        LedgerHandleCache cache =
-                LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build();
-        // read entries
-        FutureUtils.bkResult(cache.asyncReadLastConfirmedAndEntry(desc, 1L, 200L, false));
-    }
-
-    @Test(timeout = 60000, expected = BKException.BKUnexpectedConditionException.class)
-    public void testGetLengthWithoutOpeningLedger() throws Exception {
-        LedgerDescriptor desc = new LedgerDescriptor(9999, 9999, false);
-        LedgerHandleCache cache =
-                LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build();
-        // read entries
-        cache.getLength(desc);
-    }
-
-    @Test(timeout = 60000)
-    public void testOpenAndCloseLedger() throws Exception {
-        LedgerHandle lh = bkc.get().createLedger(1, 1, 1,
-                BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
-        LedgerHandleCache cache =
-                LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build();
-        LogSegmentMetadata segment = new LogSegmentMetadata.LogSegmentMetadataBuilder(
-                "/data", LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID, lh.getId(), 0L)
-                .build();
-        LedgerDescriptor desc1 = cache.openLedger(segment, false);
-        assertTrue(cache.handlesMap.containsKey(desc1));
-        LedgerHandleCache.RefCountedLedgerHandle refLh = cache.handlesMap.get(desc1);
-        assertEquals(1, refLh.getRefCount());
-        cache.openLedger(segment, false);
-        assertTrue(cache.handlesMap.containsKey(desc1));
-        assertEquals(2, refLh.getRefCount());
-        // close the ledger
-        cache.closeLedger(desc1);
-        assertTrue(cache.handlesMap.containsKey(desc1));
-        assertEquals(1, refLh.getRefCount());
-        cache.closeLedger(desc1);
-        assertFalse(cache.handlesMap.containsKey(desc1));
-        assertEquals(0, refLh.getRefCount());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
index 573ae5c..74a5231 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
@@ -99,7 +99,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
                 true);
         LogSegmentEntryStore entryStore = new BKLogSegmentEntryStore(
                 conf,
-                bkc.get(),
+                bkc,
                 scheduler,
                 NullStatsLogger.INSTANCE,
                 AsyncFailureInjector.NULL);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java
index 998c7ba..4358a8e 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java
@@ -34,8 +34,6 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.BoxedUnit;
 
 import static org.junit.Assert.*;
 
@@ -52,42 +50,22 @@ public class TestReadUtils extends TestDistributedLogBase {
     private Future<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId(
             BKDistributedLogManager bkdlm, int logsegmentIdx, long transactionId) throws Exception {
         List<LogSegmentMetadata> logSegments = bkdlm.getLogSegments();
-        final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder()
-                .bkc(bkdlm.getWriterBKC())
-                .conf(conf)
-                .build();
         return ReadUtils.getLogRecordNotLessThanTxId(
                 bkdlm.getStreamName(),
                 logSegments.get(logsegmentIdx),
                 transactionId,
                 Executors.newSingleThreadExecutor(),
-                handleCache,
+                bkdlm.getReaderEntryStore(),
                 10
-        ).ensure(new AbstractFunction0<BoxedUnit>() {
-            @Override
-            public BoxedUnit apply() {
-                handleCache.clear();
-                return BoxedUnit.UNIT;
-            }
-        });
+        );
     }
 
     private Future<LogRecordWithDLSN> getFirstGreaterThanRecord(BKDistributedLogManager bkdlm, int ledgerNo, DLSN dlsn) throws Exception {
         List<LogSegmentMetadata> ledgerList = bkdlm.getLogSegments();
-        final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder()
-                .bkc(bkdlm.getWriterBKC())
-                .conf(conf)
-                .build();
         return ReadUtils.asyncReadFirstUserRecord(
                 bkdlm.getStreamName(), ledgerList.get(ledgerNo), 2, 16, new AtomicInteger(0), Executors.newFixedThreadPool(1),
-                handleCache, dlsn
-        ).ensure(new AbstractFunction0<BoxedUnit>() {
-            @Override
-            public BoxedUnit apply() {
-                handleCache.clear();
-                return BoxedUnit.UNIT;
-            }
-        });
+                bkdlm.getReaderEntryStore(), dlsn
+        );
     }
 
     private Future<LogRecordWithDLSN> getLastUserRecord(BKDistributedLogManager bkdlm, int ledgerNo) throws Exception {
@@ -98,20 +76,10 @@ public class TestReadUtils extends TestDistributedLogBase {
                         LogSegmentFilter.DEFAULT_FILTER,
                         null)
         ).getValue();
-        final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder()
-                .bkc(bkdlm.getWriterBKC())
-                .conf(conf)
-                .build();
         return ReadUtils.asyncReadLastRecord(
                 bkdlm.getStreamName(), ledgerList.get(ledgerNo), false, false, false, 2, 16, new AtomicInteger(0), Executors.newFixedThreadPool(1),
-                handleCache
-        ).ensure(new AbstractFunction0<BoxedUnit>() {
-            @Override
-            public BoxedUnit apply() {
-                handleCache.clear();
-                return BoxedUnit.UNIT;
-            }
-        });
+                bkdlm.getReaderEntryStore()
+        );
     }
 
     @Test(timeout = 60000)