You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:08 UTC

[03/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
new file mode 100644
index 0000000..0b8c55a
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
@@ -0,0 +1,992 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.collect.Lists;
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
+import org.apache.distributedlog.exceptions.DLIllegalStateException;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.exceptions.EndOfLogSegmentException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.io.AsyncCloseable;
+import org.apache.distributedlog.logsegment.LogSegmentEntryReader;
+import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
+import org.apache.distributedlog.logsegment.LogSegmentFilter;
+import org.apache.distributedlog.util.OrderedScheduler;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Futures;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Function1;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * New ReadAhead Reader that uses {@link org.apache.distributedlog.logsegment.LogSegmentEntryReader}.
+ *
+ * NOTE: all the state changes happen in the same thread. All *unsafe* methods should be submitted to the order
+ * scheduler using stream name as the key.
+ */
+public class ReadAheadEntryReader implements
+        AsyncCloseable,
+        LogSegmentListener,
+        LogSegmentEntryReader.StateChangeListener,
+        FutureEventListener<List<Entry.Reader>> {
+
+    private static final Logger logger = LoggerFactory.getLogger(ReadAheadEntryReader.class);
+
+    //
+    // Static Functions
+    //
+
+    private static AbstractFunction1<LogSegmentEntryReader, BoxedUnit> START_READER_FUNC = new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() {
+        @Override
+        public BoxedUnit apply(LogSegmentEntryReader reader) {
+            reader.start();
+            return BoxedUnit.UNIT;
+        }
+    };
+
+    //
+    // Internal Classes
+    //
+
+    class SegmentReader implements FutureEventListener<LogSegmentEntryReader> {
+
+        private LogSegmentMetadata metadata;
+        private final long startEntryId;
+        private Future<LogSegmentEntryReader> openFuture = null;
+        private LogSegmentEntryReader reader = null;
+        private boolean isStarted = false;
+        private boolean isClosed = false;
+
+        SegmentReader(LogSegmentMetadata metadata,
+                      long startEntryId) {
+            this.metadata = metadata;
+            this.startEntryId = startEntryId;
+        }
+
+        synchronized LogSegmentEntryReader getEntryReader() {
+            return reader;
+        }
+
+        synchronized boolean isBeyondLastAddConfirmed() {
+            return null != reader && reader.isBeyondLastAddConfirmed();
+        }
+
+        synchronized LogSegmentMetadata getSegment() {
+            return metadata;
+        }
+
+        synchronized boolean isReaderOpen() {
+            return null != openFuture;
+        }
+
+        synchronized void openReader() {
+            if (null != openFuture) {
+                return;
+            }
+            openFuture = entryStore.openReader(metadata, startEntryId).addEventListener(this);
+        }
+
+        synchronized boolean isReaderStarted() {
+            return isStarted;
+        }
+
+        synchronized void startRead() {
+            if (isStarted) {
+                return;
+            }
+            isStarted = true;
+            if (null != reader) {
+                reader.start();
+            } else {
+                openFuture.onSuccess(START_READER_FUNC);
+            }
+        }
+
+        synchronized Future<List<Entry.Reader>> readNext() {
+            if (null != reader) {
+                checkCatchingUpStatus(reader);
+                return reader.readNext(numReadAheadEntries);
+            } else {
+                return openFuture.flatMap(readFunc);
+            }
+        }
+
+        synchronized void updateLogSegmentMetadata(final LogSegmentMetadata segment) {
+            if (null != reader) {
+                reader.onLogSegmentMetadataUpdated(segment);
+                this.metadata = segment;
+            } else {
+                openFuture.onSuccess(new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() {
+                    @Override
+                    public BoxedUnit apply(LogSegmentEntryReader reader) {
+                        reader.onLogSegmentMetadataUpdated(segment);
+                        synchronized (SegmentReader.this) {
+                            SegmentReader.this.metadata = segment;
+                        }
+                        return BoxedUnit.UNIT;
+                    }
+                });
+            }
+        }
+
+        @Override
+        synchronized public void onSuccess(LogSegmentEntryReader reader) {
+            this.reader = reader;
+            if (reader.getSegment().isInProgress()) {
+                reader.registerListener(ReadAheadEntryReader.this);
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable cause) {
+            // no-op, the failure will be propagated on first read.
+        }
+
+        synchronized boolean isClosed() {
+            return isClosed;
+        }
+
+        synchronized Future<Void> close() {
+            if (null == openFuture) {
+                return Future.Void();
+            }
+            return openFuture.flatMap(new AbstractFunction1<LogSegmentEntryReader, Future<Void>>() {
+                @Override
+                public Future<Void> apply(LogSegmentEntryReader reader) {
+                    return reader.asyncClose();
+                }
+            }).ensure(new Function0<BoxedUnit>() {
+                @Override
+                public BoxedUnit apply() {
+                    synchronized (SegmentReader.this) {
+                        isClosed = true;
+                    }
+                    return null;
+                }
+            });
+        }
+    }
+
+    private class ReadEntriesFunc extends AbstractFunction1<LogSegmentEntryReader, Future<List<Entry.Reader>>> {
+
+        private final int numEntries;
+
+        ReadEntriesFunc(int numEntries) {
+            this.numEntries = numEntries;
+        }
+
+        @Override
+        public Future<List<Entry.Reader>> apply(LogSegmentEntryReader reader) {
+            checkCatchingUpStatus(reader);
+            return reader.readNext(numEntries);
+        }
+    }
+
+    private abstract class CloseableRunnable implements Runnable {
+
+        @Override
+        public void run() {
+            synchronized (ReadAheadEntryReader.this) {
+                if (null != closePromise) {
+                    return;
+                }
+            }
+            try {
+                safeRun();
+            } catch (Throwable cause) {
+                logger.error("Caught unexpected exception : ", cause);
+            }
+        }
+
+        abstract void safeRun();
+
+    }
+
+    //
+    // Functions
+    //
+    private final Function1<LogSegmentEntryReader, Future<List<Entry.Reader>>> readFunc;
+    private final Function0<BoxedUnit> removeClosedSegmentReadersFunc = new Function0<BoxedUnit>() {
+        @Override
+        public BoxedUnit apply() {
+            removeClosedSegmentReaders();
+            return BoxedUnit.UNIT;
+        }
+    };
+
+    //
+    // Resources
+    //
+    private final DistributedLogConfiguration conf;
+    private final BKLogReadHandler readHandler;
+    private final LogSegmentEntryStore entryStore;
+    private final OrderedScheduler scheduler;
+
+    //
+    // Parameters
+    //
+    private final String streamName;
+    private final DLSN fromDLSN;
+    private final int maxCachedEntries;
+    private final int numReadAheadEntries;
+    private final int idleWarnThresholdMillis;
+
+    //
+    // Cache
+    //
+    private final LinkedBlockingQueue<Entry.Reader> entryQueue;
+
+    //
+    // State of the reader
+    //
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private boolean isInitialized = false;
+    private boolean readAheadPaused = false;
+    private Promise<Void> closePromise = null;
+    // segment readers
+    private long currentSegmentSequenceNumber;
+    private SegmentReader currentSegmentReader;
+    private SegmentReader nextSegmentReader;
+    private DLSN lastDLSN;
+    private final EntryPosition nextEntryPosition;
+    private volatile boolean isCatchingUp = true;
+    private final LinkedList<SegmentReader> segmentReaders;
+    private final LinkedList<SegmentReader> segmentReadersToClose;
+    // last exception that this reader encounters
+    private final AtomicReference<IOException> lastException = new AtomicReference<IOException>(null);
+    // last entry added time
+    private final Stopwatch lastEntryAddedTime;
+    // state change notification
+    private final CopyOnWriteArraySet<AsyncNotification> stateChangeNotifications =
+            new CopyOnWriteArraySet<AsyncNotification>();
+    // idle reader check task
+    private final ScheduledFuture<?> idleReaderCheckTask;
+
+    //
+    // Stats
+    //
+    private final AlertStatsLogger alertStatsLogger;
+
+    public ReadAheadEntryReader(String streamName,
+                                DLSN fromDLSN,
+                                DistributedLogConfiguration conf,
+                                BKLogReadHandler readHandler,
+                                LogSegmentEntryStore entryStore,
+                                OrderedScheduler scheduler,
+                                Ticker ticker,
+                                AlertStatsLogger alertStatsLogger) {
+        this.streamName = streamName;
+        this.fromDLSN = lastDLSN = fromDLSN;
+        this.nextEntryPosition = new EntryPosition(
+                fromDLSN.getLogSegmentSequenceNo(),
+                fromDLSN.getEntryId());
+        this.conf = conf;
+        this.maxCachedEntries = conf.getReadAheadMaxRecords();
+        this.numReadAheadEntries = conf.getReadAheadBatchSize();
+        this.idleWarnThresholdMillis = conf.getReaderIdleWarnThresholdMillis();
+        this.readHandler = readHandler;
+        this.entryStore = entryStore;
+        this.scheduler = scheduler;
+        this.readFunc = new ReadEntriesFunc(numReadAheadEntries);
+        this.alertStatsLogger = alertStatsLogger;
+
+        // create the segment reader list
+        this.segmentReaders = new LinkedList<SegmentReader>();
+        this.segmentReadersToClose = new LinkedList<SegmentReader>();
+        // create the readahead entry queue
+        this.entryQueue = new LinkedBlockingQueue<Entry.Reader>();
+
+        // start the idle reader detection
+        lastEntryAddedTime = Stopwatch.createStarted(ticker);
+        // start the idle reader check task
+        idleReaderCheckTask = scheduleIdleReaderTaskIfNecessary();
+    }
+
+    private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
+        if (idleWarnThresholdMillis < Integer.MAX_VALUE && idleWarnThresholdMillis > 0) {
+            return scheduler.scheduleAtFixedRate(streamName, new Runnable() {
+                @Override
+                public void run() {
+                    if (!isReaderIdle(idleWarnThresholdMillis, TimeUnit.MILLISECONDS)) {
+                        return;
+                    }
+                    // the readahead has been idle
+                    unsafeCheckIfReadAheadIsIdle();
+                }
+            }, idleWarnThresholdMillis, idleWarnThresholdMillis, TimeUnit.MILLISECONDS);
+        }
+        return null;
+    }
+
+    private void unsafeCheckIfReadAheadIsIdle() {
+        boolean forceReadLogSegments =
+                (null == currentSegmentReader) || currentSegmentReader.isBeyondLastAddConfirmed();
+        if (forceReadLogSegments) {
+            readHandler.readLogSegmentsFromStore(
+                    LogSegmentMetadata.COMPARATOR,
+                    LogSegmentFilter.DEFAULT_FILTER,
+                    null
+            ).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+                @Override
+                public void onFailure(Throwable cause) {
+                    // do nothing here since it would be retried on next idle reader check task
+                }
+
+                @Override
+                public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) {
+                    onSegmentsUpdated(segments.getValue());
+                }
+            });
+        }
+    }
+
+    private void cancelIdleReaderTask() {
+        if (null != idleReaderCheckTask) {
+            idleReaderCheckTask.cancel(true);
+        }
+    }
+
+    @VisibleForTesting
+    EntryPosition getNextEntryPosition() {
+        return nextEntryPosition;
+    }
+
+    @VisibleForTesting
+    SegmentReader getCurrentSegmentReader() {
+        return currentSegmentReader;
+    }
+
+    @VisibleForTesting
+    long getCurrentSegmentSequenceNumber() {
+        return currentSegmentSequenceNumber;
+    }
+
+    @VisibleForTesting
+    SegmentReader getNextSegmentReader() {
+        return nextSegmentReader;
+    }
+
+    @VisibleForTesting
+    LinkedList<SegmentReader> getSegmentReaders() {
+        return segmentReaders;
+    }
+
+    @VisibleForTesting
+    boolean isInitialized() {
+        return isInitialized;
+    }
+
+    private void orderedSubmit(Runnable runnable) {
+        synchronized (this) {
+            if (null != closePromise) {
+                return;
+            }
+        }
+        try {
+            scheduler.submit(streamName, runnable);
+        } catch (RejectedExecutionException ree) {
+            logger.debug("Failed to submit and execute an operation for readhead entry reader of {}",
+                    streamName, ree);
+        }
+    }
+
+    public void start(final List<LogSegmentMetadata> segmentList) {
+        logger.info("Starting the readahead entry reader for {} : segments = {}",
+                readHandler.getFullyQualifiedName(), segmentList);
+        started.set(true);
+        processLogSegments(segmentList);
+    }
+
+    private void removeClosedSegmentReaders() {
+        orderedSubmit(new CloseableRunnable() {
+            @Override
+            void safeRun() {
+                unsafeRemoveClosedSegmentReaders();
+            }
+        });
+    }
+
+    private void unsafeRemoveClosedSegmentReaders() {
+        SegmentReader reader = segmentReadersToClose.peekFirst();
+        while (null != reader) {
+            if (reader.isClosed()) {
+                segmentReadersToClose.pollFirst();
+                reader = segmentReadersToClose.peekFirst();
+            } else {
+                break;
+            }
+        }
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        final Promise<Void> closeFuture;
+        synchronized (this) {
+            if (null != closePromise) {
+                return closePromise;
+            }
+            closePromise = closeFuture = new Promise<Void>();
+        }
+
+        // cancel the idle reader task
+        cancelIdleReaderTask();
+
+        // use runnable here instead of CloseableRunnable,
+        // because we need this to be executed
+        try {
+            scheduler.submit(streamName, new Runnable() {
+                @Override
+                public void run() {
+                    unsafeAsyncClose(closeFuture);
+                }
+            });
+        } catch (RejectedExecutionException ree) {
+            logger.warn("Scheduler has been shutdown before closing the readahead entry reader for stream {}",
+                    streamName, ree);
+            unsafeAsyncClose(closeFuture);
+        }
+
+        return closeFuture;
+    }
+
+    private void unsafeAsyncClose(Promise<Void> closePromise) {
+        List<Future<Void>> closeFutures = Lists.newArrayListWithExpectedSize(
+                segmentReaders.size() + segmentReadersToClose.size() + 1);
+        if (null != currentSegmentReader) {
+            segmentReadersToClose.add(currentSegmentReader);
+        }
+        if (null != nextSegmentReader) {
+            segmentReadersToClose.add(nextSegmentReader);
+        }
+        for (SegmentReader reader : segmentReaders) {
+            segmentReadersToClose.add(reader);
+        }
+        segmentReaders.clear();
+        for (SegmentReader reader : segmentReadersToClose) {
+            closeFutures.add(reader.close());
+        }
+        Futures.collect(closeFutures).proxyTo(closePromise);
+    }
+
+    //
+    // Reader State Changes
+    //
+
+    ReadAheadEntryReader addStateChangeNotification(AsyncNotification notification) {
+        this.stateChangeNotifications.add(notification);
+        return this;
+    }
+
+    ReadAheadEntryReader removeStateChangeNotification(AsyncNotification notification) {
+        this.stateChangeNotifications.remove(notification);
+        return this;
+    }
+
+    private void notifyStateChangeOnSuccess() {
+        for (AsyncNotification notification : stateChangeNotifications) {
+            notification.notifyOnOperationComplete();
+        }
+    }
+
+    private void notifyStateChangeOnFailure(Throwable cause) {
+        for (AsyncNotification notification : stateChangeNotifications) {
+            notification.notifyOnError(cause);
+        }
+    }
+
+    void setLastException(IOException cause) {
+        if (!lastException.compareAndSet(null, cause)) {
+            logger.debug("last exception has already been set to ", lastException.get());
+        }
+        // the exception is set and notify the state change
+        notifyStateChangeOnFailure(cause);
+    }
+
+    void checkLastException() throws IOException {
+        if (null != lastException.get()) {
+            throw lastException.get();
+        }
+    }
+
+    void checkCatchingUpStatus(LogSegmentEntryReader reader) {
+        if (reader.getSegment().isInProgress()
+                && isCatchingUp
+                && reader.hasCaughtUpOnInprogress()) {
+            logger.info("ReadAhead for {} is caught up at entry {} @ log segment {}.",
+                    new Object[] { readHandler.getFullyQualifiedName(),
+                            reader.getLastAddConfirmed(), reader.getSegment() });
+            isCatchingUp = false;
+        }
+    }
+
+    void markCaughtup() {
+        if (isCatchingUp) {
+            isCatchingUp = false;
+            logger.info("ReadAhead for {} is caught up", readHandler.getFullyQualifiedName());
+        }
+    }
+
+    public boolean isReadAheadCaughtUp() {
+        return !isCatchingUp;
+    }
+
+    @Override
+    public void onCaughtupOnInprogress() {
+        markCaughtup();
+    }
+
+    //
+    // ReadAhead State Machine
+    //
+
+    @Override
+    public void onSuccess(List<Entry.Reader> entries) {
+        lastEntryAddedTime.reset().start();
+        for (Entry.Reader entry : entries) {
+            entryQueue.add(entry);
+        }
+        if (!entries.isEmpty()) {
+            Entry.Reader lastEntry = entries.get(entries.size() - 1);
+            nextEntryPosition.advance(lastEntry.getLSSN(), lastEntry.getEntryId() + 1);
+        }
+        // notify on data available
+        notifyStateChangeOnSuccess();
+        if (entryQueue.size() >= maxCachedEntries) {
+            pauseReadAheadOnCacheFull();
+        } else {
+            scheduleReadNext();
+        }
+    }
+
+    @Override
+    public void onFailure(Throwable cause) {
+        if (cause instanceof EndOfLogSegmentException) {
+            // we reach end of the log segment
+            moveToNextLogSegment();
+            return;
+        }
+        if (cause instanceof IOException) {
+            setLastException((IOException) cause);
+        } else {
+            setLastException(new UnexpectedException("Unexpected non I/O exception", cause));
+        }
+    }
+
+    private synchronized void invokeReadAhead() {
+        if (readAheadPaused) {
+            scheduleReadNext();
+            readAheadPaused = false;
+        }
+    }
+
+    private synchronized void pauseReadAheadOnCacheFull() {
+        this.readAheadPaused = true;
+        if (!isCacheFull()) {
+            invokeReadAhead();
+        }
+    }
+
+    private synchronized void pauseReadAheadOnNoMoreLogSegments() {
+        this.readAheadPaused = true;
+    }
+
+    //
+    // Cache Related Methods
+    //
+
+    public Entry.Reader getNextReadAheadEntry(long waitTime, TimeUnit waitTimeUnit) throws IOException {
+        if (null != lastException.get()) {
+            throw lastException.get();
+        }
+        Entry.Reader entry;
+        try {
+            entry = entryQueue.poll(waitTime, waitTimeUnit);
+        } catch (InterruptedException e) {
+            throw new DLInterruptedException("Interrupted on waiting next readahead entry : ", e);
+        }
+        try {
+            return entry;
+        } finally {
+            // resume readahead if the cache becomes empty
+            if (null != entry && !isCacheFull()) {
+                invokeReadAhead();
+            }
+        }
+    }
+
+    /**
+     * Return number cached entries.
+     *
+     * @return number cached entries.
+     */
+    public int getNumCachedEntries() {
+        return entryQueue.size();
+    }
+
+    /**
+     * Return if the cache is full.
+     *
+     * @return true if the cache is full, otherwise false.
+     */
+    public boolean isCacheFull() {
+        return getNumCachedEntries() >= maxCachedEntries;
+    }
+
+    @VisibleForTesting
+    public boolean isCacheEmpty() {
+        return entryQueue.isEmpty();
+    }
+
+    /**
+     * Check whether the readahead becomes stall.
+     *
+     * @param idleReaderErrorThreshold idle reader error threshold
+     * @param timeUnit time unit of the idle reader error threshold
+     * @return true if the readahead becomes stall, otherwise false.
+     */
+    public boolean isReaderIdle(int idleReaderErrorThreshold, TimeUnit timeUnit) {
+        return (lastEntryAddedTime.elapsed(timeUnit) > idleReaderErrorThreshold);
+    }
+
+    //
+    // LogSegment Management
+    //
+
+    void processLogSegments(final List<LogSegmentMetadata> segments) {
+        orderedSubmit(new CloseableRunnable() {
+            @Override
+            void safeRun() {
+                unsafeProcessLogSegments(segments);
+            }
+        });
+    }
+
+    private void unsafeProcessLogSegments(List<LogSegmentMetadata> segments) {
+        if (isInitialized) {
+            unsafeReinitializeLogSegments(segments);
+        } else {
+            unsafeInitializeLogSegments(segments);
+        }
+    }
+
+    /**
+     * Update the log segment metadata.
+     *
+     * @param reader the reader to update the metadata
+     * @param newMetadata the new metadata received
+     * @return true if successfully, false on encountering errors
+     */
+    private boolean updateLogSegmentMetadata(SegmentReader reader,
+                                             LogSegmentMetadata newMetadata) {
+        if (reader.getSegment().getLogSegmentSequenceNumber() != newMetadata.getLogSegmentSequenceNumber()) {
+            setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for "
+                    + streamName + " : current segment = " + reader.getSegment() + ", new segment = " + newMetadata));
+            return false;
+        }
+        if (!reader.getSegment().isInProgress() && newMetadata.isInProgress()) {
+            setLastException(new DLIllegalStateException("An inprogress log segment " + newMetadata
+                    + " received after a closed log segment " + reader.getSegment() + " on reading segment "
+                    + newMetadata.getLogSegmentSequenceNumber() + " @ stream " + streamName));
+            return false;
+        }
+        if (reader.getSegment().isInProgress() && !newMetadata.isInProgress()) {
+            reader.updateLogSegmentMetadata(newMetadata);
+        }
+        return true;
+    }
+
+    /**
+     * Reinitialize the log segments
+     */
+    private void unsafeReinitializeLogSegments(List<LogSegmentMetadata> segments) {
+        logger.info("Reinitialize log segments with {}", segments);
+        int segmentIdx = 0;
+        for (; segmentIdx < segments.size(); segmentIdx++) {
+            LogSegmentMetadata segment = segments.get(segmentIdx);
+            if (segment.getLogSegmentSequenceNumber() < currentSegmentSequenceNumber) {
+                continue;
+            }
+            break;
+        }
+        if (segmentIdx >= segments.size()) {
+            return;
+        }
+        LogSegmentMetadata segment = segments.get(segmentIdx);
+        if (null != currentSegmentReader) {
+            if (!updateLogSegmentMetadata(currentSegmentReader, segment)) {
+                return;
+            }
+        } else {
+            if (currentSegmentSequenceNumber != segment.getLogSegmentSequenceNumber()) {
+                setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for "
+                        + streamName + " : current segment sn = " + currentSegmentSequenceNumber
+                        + ", new segment sn = " + segment.getLogSegmentSequenceNumber()));
+                return;
+            }
+        }
+        segmentIdx++;
+        if (segmentIdx >= segments.size()) {
+            return;
+        }
+        // check next segment
+        segment = segments.get(segmentIdx);
+        if (null != nextSegmentReader) {
+            if (!updateLogSegmentMetadata(nextSegmentReader, segment)) {
+                return;
+            }
+            segmentIdx++;
+        }
+        // check the segment readers in the queue
+        for (int readerIdx = 0;
+             readerIdx < segmentReaders.size() && segmentIdx < segments.size();
+             readerIdx++, segmentIdx++) {
+            SegmentReader reader = segmentReaders.get(readerIdx);
+            segment = segments.get(segmentIdx);
+            if (!updateLogSegmentMetadata(reader, segment)) {
+                return;
+            }
+        }
+        // add the remaining segments to the reader queue
+        for (; segmentIdx < segments.size(); segmentIdx++) {
+            segment = segments.get(segmentIdx);
+            SegmentReader reader = new SegmentReader(segment, 0L);
+            reader.openReader();
+            segmentReaders.add(reader);
+        }
+        if (null == currentSegmentReader) {
+            unsafeMoveToNextLogSegment();
+        }
+        // resume readahead if necessary
+        invokeReadAhead();
+    }
+
+    /**
+     * Initialize the reader with the log <i>segments</i>.
+     *
+     * @param segments list of log segments
+     */
+    private void unsafeInitializeLogSegments(List<LogSegmentMetadata> segments) {
+        if (segments.isEmpty()) {
+            // not initialize the background reader, until the first log segment is notified
+            return;
+        }
+        boolean skipTruncatedLogSegments = true;
+        DLSN dlsnToStart = fromDLSN;
+        // positioning the reader
+        for (int i = 0; i < segments.size(); i++) {
+            LogSegmentMetadata segment = segments.get(i);
+            // skip any log segments that have smaller log segment sequence numbers
+            if (segment.getLogSegmentSequenceNumber() < fromDLSN.getLogSegmentSequenceNo()) {
+                continue;
+            }
+            // if the log segment is truncated, skip it.
+            if (skipTruncatedLogSegments &&
+                    !conf.getIgnoreTruncationStatus() &&
+                    segment.isTruncated()) {
+                continue;
+            }
+            // if the log segment is partially truncated, move the start dlsn to the min active dlsn
+            if (skipTruncatedLogSegments &&
+                    !conf.getIgnoreTruncationStatus() &&
+                    segment.isPartiallyTruncated()) {
+                if (segment.getMinActiveDLSN().compareTo(fromDLSN) > 0) {
+                    dlsnToStart = segment.getMinActiveDLSN();
+                }
+            }
+            skipTruncatedLogSegments = false;
+            if (!isAllowedToPosition(segment, dlsnToStart)) {
+                logger.error("segment {} is not allowed to position at {}", segment, dlsnToStart);
+                return;
+            }
+
+            SegmentReader reader = new SegmentReader(segment,
+                    segment.getLogSegmentSequenceNumber() == dlsnToStart.getLogSegmentSequenceNo()
+                            ? dlsnToStart.getEntryId() : 0L);
+            segmentReaders.add(reader);
+        }
+        if (segmentReaders.isEmpty()) {
+            // not initialize the background reader, until the first log segment is available to read
+            return;
+        }
+        currentSegmentReader = segmentReaders.pollFirst();
+        currentSegmentReader.openReader();
+        currentSegmentReader.startRead();
+        currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
+        unsafeReadNext(currentSegmentReader);
+        if (!segmentReaders.isEmpty()) {
+            for (SegmentReader reader : segmentReaders) {
+                reader.openReader();
+            }
+            unsafePrefetchNextSegment(true);
+        }
+        // mark the reader initialized
+        isInitialized = true;
+    }
+
+    private void unsafePrefetchNextSegment(boolean onlyInprogressLogSegment) {
+        SegmentReader nextReader = segmentReaders.peekFirst();
+        // open the next log segment if it is inprogress
+        if (null != nextReader) {
+            if (onlyInprogressLogSegment && !nextReader.getSegment().isInProgress()) {
+                return;
+            }
+            nextReader.startRead();
+            nextSegmentReader = nextReader;
+            segmentReaders.pollFirst();
+        }
+    }
+
+    /**
+     * Check if we are allowed to position the reader at <i>fromDLSN</i>.
+     *
+     * @return true if it is allowed, otherwise false.
+     */
+    private boolean isAllowedToPosition(LogSegmentMetadata segment, DLSN fromDLSN) {
+        if (segment.isTruncated()
+                && segment.getLastDLSN().compareTo(fromDLSN) >= 0
+                && !conf.getIgnoreTruncationStatus()) {
+            setLastException(new AlreadyTruncatedTransactionException(streamName
+                    + " : trying to position read ahead at " + fromDLSN
+                    + " on a segment " + segment + " that is already marked as truncated"));
+            return false;
+        }
+        if (segment.isPartiallyTruncated() &&
+                segment.getMinActiveDLSN().compareTo(fromDLSN) > 0) {
+            if (conf.getAlertWhenPositioningOnTruncated()) {
+                alertStatsLogger.raise("Trying to position reader on {} when {} is marked partially truncated",
+                    fromDLSN, segment);
+            }
+            if (!conf.getIgnoreTruncationStatus()) {
+                logger.error("{}: Trying to position reader on {} when {} is marked partially truncated",
+                        new Object[]{ streamName, fromDLSN, segment });
+
+                setLastException(new AlreadyTruncatedTransactionException(streamName
+                        + " : trying to position read ahead at " + fromDLSN
+                        + " on a segment " + segment + " that is already marked as truncated"));
+                return false;
+            }
+        }
+        return true;
+    }
+
+    void moveToNextLogSegment() {
+        orderedSubmit(new CloseableRunnable() {
+            @Override
+            void safeRun() {
+                unsafeMoveToNextLogSegment();
+            }
+        });
+    }
+
+    private void unsafeMoveToNextLogSegment() {
+        if (null != currentSegmentReader) {
+            segmentReadersToClose.add(currentSegmentReader);
+            currentSegmentReader.close().ensure(removeClosedSegmentReadersFunc);
+            logger.debug("close current segment reader {}", currentSegmentReader.getSegment());
+            currentSegmentReader = null;
+        }
+        boolean hasSegmentToRead = false;
+        if (null != nextSegmentReader) {
+            currentSegmentReader = nextSegmentReader;
+            logger.debug("move to read segment {}", currentSegmentReader.getSegment());
+            currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
+            nextSegmentReader = null;
+            // start reading
+            unsafeReadNext(currentSegmentReader);
+            unsafePrefetchNextSegment(true);
+            hasSegmentToRead = true;
+        } else {
+            unsafePrefetchNextSegment(false);
+            if (null != nextSegmentReader) {
+                currentSegmentReader = nextSegmentReader;
+                logger.debug("move to read segment {}", currentSegmentReader.getSegment());
+                currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
+                nextSegmentReader = null;
+                unsafeReadNext(currentSegmentReader);
+                unsafePrefetchNextSegment(true);
+                hasSegmentToRead = true;
+            }
+        }
+        if (!hasSegmentToRead) { // no more segment to read, wait until new log segment arrive
+            if (isCatchingUp) {
+                logger.info("ReadAhead for {} is caught up and no log segments to read now",
+                        readHandler.getFullyQualifiedName());
+                isCatchingUp = false;
+            }
+            pauseReadAheadOnNoMoreLogSegments();
+        }
+    }
+
+    void scheduleReadNext() {
+        orderedSubmit(new CloseableRunnable() {
+            @Override
+            void safeRun() {
+                if (null == currentSegmentReader) {
+                    pauseReadAheadOnNoMoreLogSegments();
+                    return;
+                }
+                unsafeReadNext(currentSegmentReader);
+            }
+        });
+    }
+
+    private void unsafeReadNext(SegmentReader reader) {
+        reader.readNext().addEventListener(this);
+    }
+
+    @Override
+    public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
+        if (!started.get()) {
+            return;
+        }
+        logger.info("segments is updated with {}", segments);
+        processLogSegments(segments);
+    }
+
+    @Override
+    public void onLogStreamDeleted() {
+        setLastException(new LogNotFoundException("Log stream "
+                + streamName + " is deleted"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java
new file mode 100644
index 0000000..9935d5f
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java
@@ -0,0 +1,782 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
+import org.apache.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import org.apache.distributedlog.selector.FirstDLSNNotLessThanSelector;
+import org.apache.distributedlog.selector.FirstTxIdNotLessThanSelector;
+import org.apache.distributedlog.selector.LastRecordSelector;
+import org.apache.distributedlog.selector.LogRecordSelector;
+import org.apache.distributedlog.util.FutureUtils.FutureEventListenerRunnable;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction0;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Utility function for readers
+ */
+public class ReadUtils {
+
+    static final Logger LOG = LoggerFactory.getLogger(ReadUtils.class);
+
+    private static final int MIN_SEARCH_BATCH_SIZE = 2;
+
+    //
+    // Read First & Last Record Functions
+    //
+
+    /**
+     * Read last record from a log segment.
+     *
+     * @param streamName
+     *          fully qualified stream name (used for logging)
+     * @param l
+     *          log segment metadata.
+     * @param fence
+     *          whether to fence the log segment.
+     * @param includeControl
+     *          whether to include control record.
+     * @param includeEndOfStream
+     *          whether to include end of stream.
+     * @param scanStartBatchSize
+     *          first num entries used for read last record scan
+     * @param scanMaxBatchSize
+     *          max num entries used for read last record scan
+     * @param numRecordsScanned
+     *          num of records scanned to get last record
+     * @param executorService
+     *          executor service used for processing entries
+     * @param entryStore
+     *          log segment entry store
+     * @return a future with last record.
+     */
+    public static Future<LogRecordWithDLSN> asyncReadLastRecord(
+            final String streamName,
+            final LogSegmentMetadata l,
+            final boolean fence,
+            final boolean includeControl,
+            final boolean includeEndOfStream,
+            final int scanStartBatchSize,
+            final int scanMaxBatchSize,
+            final AtomicInteger numRecordsScanned,
+            final ExecutorService executorService,
+            final LogSegmentEntryStore entryStore) {
+        final LogRecordSelector selector = new LastRecordSelector();
+        return asyncReadRecord(streamName, l, fence, includeControl, includeEndOfStream, scanStartBatchSize,
+                               scanMaxBatchSize, numRecordsScanned, executorService, entryStore,
+                               selector, true /* backward */, 0L);
+    }
+
+    /**
+     * Read first record from a log segment with a DLSN larger than that given.
+     *
+     * @param streamName
+     *          fully qualified stream name (used for logging)
+     * @param l
+     *          log segment metadata.
+     * @param scanStartBatchSize
+     *          first num entries used for read last record scan
+     * @param scanMaxBatchSize
+     *          max num entries used for read last record scan
+     * @param numRecordsScanned
+     *          num of records scanned to get last record
+     * @param executorService
+     *          executor service used for processing entries
+     * @param entryStore
+     *          log segment entry store
+     * @param dlsn
+     *          threshold dlsn
+     * @return a future with last record.
+     */
+    public static Future<LogRecordWithDLSN> asyncReadFirstUserRecord(
+            final String streamName,
+            final LogSegmentMetadata l,
+            final int scanStartBatchSize,
+            final int scanMaxBatchSize,
+            final AtomicInteger numRecordsScanned,
+            final ExecutorService executorService,
+            final LogSegmentEntryStore entryStore,
+            final DLSN dlsn) {
+        long startEntryId = 0L;
+        if (l.getLogSegmentSequenceNumber() == dlsn.getLogSegmentSequenceNo()) {
+            startEntryId = dlsn.getEntryId();
+        }
+        final LogRecordSelector selector = new FirstDLSNNotLessThanSelector(dlsn);
+        return asyncReadRecord(streamName, l, false, false, false, scanStartBatchSize,
+                               scanMaxBatchSize, numRecordsScanned, executorService, entryStore,
+                               selector, false /* backward */, startEntryId);
+    }
+
+    //
+    // Private methods for scanning log segments
+    //
+
+    private static class ScanContext {
+        // variables to about current scan state
+        final AtomicInteger numEntriesToScan;
+        final AtomicLong curStartEntryId;
+        final AtomicLong curEndEntryId;
+
+        // scan settings
+        final long startEntryId;
+        final long endEntryId;
+        final int scanStartBatchSize;
+        final int scanMaxBatchSize;
+        final boolean includeControl;
+        final boolean includeEndOfStream;
+        final boolean backward;
+
+        // number of records scanned
+        final AtomicInteger numRecordsScanned;
+
+        ScanContext(long startEntryId, long endEntryId,
+                    int scanStartBatchSize,
+                    int scanMaxBatchSize,
+                    boolean includeControl,
+                    boolean includeEndOfStream,
+                    boolean backward,
+                    AtomicInteger numRecordsScanned) {
+            this.startEntryId = startEntryId;
+            this.endEntryId = endEntryId;
+            this.scanStartBatchSize = scanStartBatchSize;
+            this.scanMaxBatchSize = scanMaxBatchSize;
+            this.includeControl = includeControl;
+            this.includeEndOfStream = includeEndOfStream;
+            this.backward = backward;
+            // Scan state
+            this.numEntriesToScan = new AtomicInteger(scanStartBatchSize);
+            if (backward) {
+                this.curStartEntryId = new AtomicLong(
+                        Math.max(startEntryId, (endEntryId - scanStartBatchSize + 1)));
+                this.curEndEntryId = new AtomicLong(endEntryId);
+            } else {
+                this.curStartEntryId = new AtomicLong(startEntryId);
+                this.curEndEntryId = new AtomicLong(
+                        Math.min(endEntryId, (startEntryId + scanStartBatchSize - 1)));
+            }
+            this.numRecordsScanned = numRecordsScanned;
+        }
+
+        boolean moveToNextRange() {
+            if (backward) {
+                return moveBackward();
+            } else {
+                return moveForward();
+            }
+        }
+
+        boolean moveBackward() {
+            long nextEndEntryId = curStartEntryId.get() - 1;
+            if (nextEndEntryId < startEntryId) {
+                // no entries to read again
+                return false;
+            }
+            curEndEntryId.set(nextEndEntryId);
+            // update num entries to scan
+            numEntriesToScan.set(
+                    Math.min(numEntriesToScan.get() * 2, scanMaxBatchSize));
+            // update start entry id
+            curStartEntryId.set(Math.max(startEntryId, nextEndEntryId - numEntriesToScan.get() + 1));
+            return true;
+        }
+
+        boolean moveForward() {
+            long nextStartEntryId = curEndEntryId.get() + 1;
+            if (nextStartEntryId > endEntryId) {
+                // no entries to read again
+                return false;
+            }
+            curStartEntryId.set(nextStartEntryId);
+            // update num entries to scan
+            numEntriesToScan.set(
+                    Math.min(numEntriesToScan.get() * 2, scanMaxBatchSize));
+            // update start entry id
+            curEndEntryId.set(Math.min(endEntryId, nextStartEntryId + numEntriesToScan.get() - 1));
+            return true;
+        }
+    }
+
+    private static class SingleEntryScanContext extends ScanContext {
+        SingleEntryScanContext(long entryId) {
+            super(entryId, entryId, 1, 1, true, true, false, new AtomicInteger(0));
+        }
+    }
+
+    /**
+     * Read record from a given range of log segment entries.
+     *
+     * @param streamName
+     *          fully qualified stream name (used for logging)
+     * @param reader
+     *          log segment random access reader
+     * @param executorService
+     *          executor service used for processing entries
+     * @param context
+     *          scan context
+     * @return a future with the log record.
+     */
+    private static Future<LogRecordWithDLSN> asyncReadRecordFromEntries(
+            final String streamName,
+            final LogSegmentRandomAccessEntryReader reader,
+            final LogSegmentMetadata metadata,
+            final ExecutorService executorService,
+            final ScanContext context,
+            final LogRecordSelector selector) {
+        final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>();
+        final long startEntryId = context.curStartEntryId.get();
+        final long endEntryId = context.curEndEntryId.get();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{} reading entries [{} - {}] from {}.",
+                    new Object[] { streamName, startEntryId, endEntryId, metadata});
+        }
+        FutureEventListener<List<Entry.Reader>> readEntriesListener =
+            new FutureEventListener<List<Entry.Reader>>() {
+                @Override
+                public void onSuccess(final List<Entry.Reader> entries) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("{} finished reading entries [{} - {}] from {}",
+                                new Object[]{ streamName, startEntryId, endEntryId, metadata});
+                    }
+                    for (Entry.Reader entry : entries) {
+                        try {
+                            visitEntryRecords(entry, context, selector);
+                        } catch (IOException ioe) {
+                            // exception is only thrown due to bad ledger entry, so it might be corrupted
+                            // we shouldn't do anything beyond this point. throw the exception to application
+                            promise.setException(ioe);
+                            return;
+                        }
+                    }
+
+                    LogRecordWithDLSN record = selector.result();
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("{} got record from entries [{} - {}] of {} : {}",
+                                new Object[]{streamName, startEntryId, endEntryId,
+                                        metadata, record});
+                    }
+                    promise.setValue(record);
+                }
+
+                @Override
+                public void onFailure(final Throwable cause) {
+                    promise.setException(cause);
+                }
+            };
+        reader.readEntries(startEntryId, endEntryId)
+                .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService));
+        return promise;
+    }
+
+    /**
+     * Process each record using LogRecordSelector.
+     *
+     * @param entry
+     *          ledger entry
+     * @param context
+     *          scan context
+     * @return log record with dlsn inside the ledger entry
+     * @throws IOException
+     */
+    private static void visitEntryRecords(
+            Entry.Reader entry,
+            ScanContext context,
+            LogRecordSelector selector) throws IOException {
+        LogRecordWithDLSN nextRecord = entry.nextRecord();
+        while (nextRecord != null) {
+            LogRecordWithDLSN record = nextRecord;
+            nextRecord = entry.nextRecord();
+            context.numRecordsScanned.incrementAndGet();
+            if (!context.includeControl && record.isControl()) {
+                continue;
+            }
+            if (!context.includeEndOfStream && record.isEndOfStream()) {
+                continue;
+            }
+            selector.process(record);
+        }
+    }
+
+    /**
+     * Scan entries for the given record.
+     *
+     * @param streamName
+     *          fully qualified stream name (used for logging)
+     * @param reader
+     *          log segment random access reader
+     * @param executorService
+     *          executor service used for processing entries
+     * @param promise
+     *          promise to return desired record.
+     * @param context
+     *          scan context
+     */
+    private static void asyncReadRecordFromEntries(
+            final String streamName,
+            final LogSegmentRandomAccessEntryReader reader,
+            final LogSegmentMetadata metadata,
+            final ExecutorService executorService,
+            final Promise<LogRecordWithDLSN> promise,
+            final ScanContext context,
+            final LogRecordSelector selector) {
+        FutureEventListener<LogRecordWithDLSN> readEntriesListener =
+            new FutureEventListener<LogRecordWithDLSN>() {
+                @Override
+                public void onSuccess(LogRecordWithDLSN value) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("{} read record from [{} - {}] of {} : {}",
+                                new Object[]{streamName, context.curStartEntryId.get(), context.curEndEntryId.get(),
+                                        metadata, value});
+                    }
+                    if (null != value) {
+                        promise.setValue(value);
+                        return;
+                    }
+                    if (!context.moveToNextRange()) {
+                        // no entries to read again
+                        promise.setValue(null);
+                        return;
+                    }
+                    // scan next range
+                    asyncReadRecordFromEntries(streamName,
+                            reader,
+                            metadata,
+                            executorService,
+                            promise,
+                            context,
+                            selector);
+                }
+
+                @Override
+                public void onFailure(Throwable cause) {
+                    promise.setException(cause);
+                }
+            };
+        asyncReadRecordFromEntries(streamName, reader, metadata, executorService, context, selector)
+                .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService));
+    }
+
+    private static void asyncReadRecordFromLogSegment(
+            final String streamName,
+            final LogSegmentRandomAccessEntryReader reader,
+            final LogSegmentMetadata metadata,
+            final ExecutorService executorService,
+            final int scanStartBatchSize,
+            final int scanMaxBatchSize,
+            final boolean includeControl,
+            final boolean includeEndOfStream,
+            final Promise<LogRecordWithDLSN> promise,
+            final AtomicInteger numRecordsScanned,
+            final LogRecordSelector selector,
+            final boolean backward,
+            final long startEntryId) {
+        final long lastAddConfirmed = reader.getLastAddConfirmed();
+        if (lastAddConfirmed < 0) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Log segment {} is empty for {}.", new Object[] { metadata, streamName });
+            }
+            promise.setValue(null);
+            return;
+        }
+        final ScanContext context = new ScanContext(
+                startEntryId, lastAddConfirmed,
+                scanStartBatchSize, scanMaxBatchSize,
+                includeControl, includeEndOfStream, backward, numRecordsScanned);
+        asyncReadRecordFromEntries(streamName, reader, metadata, executorService,
+                                   promise, context, selector);
+    }
+
+    private static Future<LogRecordWithDLSN> asyncReadRecord(
+            final String streamName,
+            final LogSegmentMetadata l,
+            final boolean fence,
+            final boolean includeControl,
+            final boolean includeEndOfStream,
+            final int scanStartBatchSize,
+            final int scanMaxBatchSize,
+            final AtomicInteger numRecordsScanned,
+            final ExecutorService executorService,
+            final LogSegmentEntryStore entryStore,
+            final LogRecordSelector selector,
+            final boolean backward,
+            final long startEntryId) {
+
+        final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>();
+
+        FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener =
+            new FutureEventListener<LogSegmentRandomAccessEntryReader>() {
+                @Override
+                public void onSuccess(final LogSegmentRandomAccessEntryReader reader) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("{} Opened log segment {} for reading record",
+                                streamName, l);
+                    }
+                    promise.ensure(new AbstractFunction0<BoxedUnit>() {
+                        @Override
+                        public BoxedUnit apply() {
+                            reader.asyncClose();
+                            return BoxedUnit.UNIT;
+                        }
+                    });
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("{} {} scanning {}.", new Object[]{
+                                (backward ? "backward" : "forward"), streamName, l});
+                    }
+                    asyncReadRecordFromLogSegment(
+                            streamName, reader, l, executorService,
+                            scanStartBatchSize, scanMaxBatchSize,
+                            includeControl, includeEndOfStream,
+                            promise, numRecordsScanned, selector, backward, startEntryId);
+                }
+
+                @Override
+                public void onFailure(final Throwable cause) {
+                    promise.setException(cause);
+                }
+            };
+        entryStore.openRandomAccessReader(l, fence)
+                .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService));
+        return promise;
+    }
+
+    //
+    // Search Functions
+    //
+
+    /**
+     * Get the log record whose transaction id is not less than provided <code>transactionId</code>.
+     *
+     * <p>
+     * It uses a binary-search like algorithm to find the log record whose transaction id is not less than
+     * provided <code>transactionId</code> within a log <code>segment</code>. You could think of a log segment
+     * in terms of a sequence of records whose transaction ids are non-decreasing.
+     *
+     * - The sequence of records within a log segment is divided into N pieces.
+     * - Find the piece of records that contains a record whose transaction id is not less than provided
+     *   <code>transactionId</code>.
+     *
+     * N could be chosen based on trading off concurrency and latency.
+     * </p>
+     *
+     * @param logName
+     *          name of the log
+     * @param segment
+     *          metadata of the log segment
+     * @param transactionId
+     *          transaction id
+     * @param executorService
+     *          executor service used for processing entries
+     * @param entryStore
+     *          log segment entry store
+     * @param nWays
+     *          how many number of entries to search in parallel
+     * @return found log record. none if all transaction ids are less than provided <code>transactionId</code>.
+     */
+    public static Future<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId(
+            final String logName,
+            final LogSegmentMetadata segment,
+            final long transactionId,
+            final ExecutorService executorService,
+            final LogSegmentEntryStore entryStore,
+            final int nWays) {
+        if (!segment.isInProgress()) {
+            if (segment.getLastTxId() < transactionId) {
+                // all log records whose transaction id is less than provided transactionId
+                // then return none
+                Optional<LogRecordWithDLSN> noneRecord = Optional.absent();
+                return Future.value(noneRecord);
+            }
+        }
+
+        final Promise<Optional<LogRecordWithDLSN>> promise =
+                new Promise<Optional<LogRecordWithDLSN>>();
+        final FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener =
+            new FutureEventListener<LogSegmentRandomAccessEntryReader>() {
+                @Override
+                public void onSuccess(final LogSegmentRandomAccessEntryReader reader) {
+                    promise.ensure(new AbstractFunction0<BoxedUnit>() {
+                        @Override
+                        public BoxedUnit apply() {
+                            reader.asyncClose();
+                            return BoxedUnit.UNIT;
+                        }
+
+                    });
+                    long lastEntryId = reader.getLastAddConfirmed();
+                    if (lastEntryId < 0) {
+                        // it means that the log segment is created but not written yet or an empty log segment.
+                        // it is equivalent to 'all log records whose transaction id is less than provided transactionId'
+                        Optional<LogRecordWithDLSN> nonRecord = Optional.absent();
+                        promise.setValue(nonRecord);
+                        return;
+                    }
+                    // all log records whose transaction id is not less than provided transactionId
+                    if (segment.getFirstTxId() >= transactionId) {
+                        final FirstTxIdNotLessThanSelector selector =
+                                new FirstTxIdNotLessThanSelector(transactionId);
+                        asyncReadRecordFromEntries(
+                                logName,
+                                reader,
+                                segment,
+                                executorService,
+                                new SingleEntryScanContext(0L),
+                                selector
+                        ).addEventListener(new FutureEventListener<LogRecordWithDLSN>() {
+                            @Override
+                            public void onSuccess(LogRecordWithDLSN value) {
+                                promise.setValue(Optional.of(selector.result()));
+                            }
+
+                            @Override
+                            public void onFailure(Throwable cause) {
+                                promise.setException(cause);
+                            }
+                        });
+
+                        return;
+                    }
+                    getLogRecordNotLessThanTxIdFromEntries(
+                            logName,
+                            segment,
+                            transactionId,
+                            executorService,
+                            reader,
+                            Lists.newArrayList(0L, lastEntryId),
+                            nWays,
+                            Optional.<LogRecordWithDLSN>absent(),
+                            promise);
+                }
+
+                @Override
+                public void onFailure(final Throwable cause) {
+                    promise.setException(cause);
+                }
+            };
+
+        entryStore.openRandomAccessReader(segment, false)
+                .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService));
+        return promise;
+    }
+
+    /**
+     * Find the log record whose transaction id is not less than provided <code>transactionId</code> from
+     * entries between <code>startEntryId</code> and <code>endEntryId</code>.
+     *
+     * @param logName
+     *          name of the log
+     * @param segment
+     *          log segment
+     * @param transactionId
+     *          provided transaction id to search
+     * @param executorService
+     *          executor service
+     * @param reader
+     *          log segment random access reader
+     * @param entriesToSearch
+     *          list of entries to search
+     * @param nWays
+     *          how many entries to search in parallel
+     * @param prevFoundRecord
+     *          the log record found in previous search
+     * @param promise
+     *          promise to satisfy the result
+     */
+    private static void getLogRecordNotLessThanTxIdFromEntries(
+            final String logName,
+            final LogSegmentMetadata segment,
+            final long transactionId,
+            final ExecutorService executorService,
+            final LogSegmentRandomAccessEntryReader reader,
+            final List<Long> entriesToSearch,
+            final int nWays,
+            final Optional<LogRecordWithDLSN> prevFoundRecord,
+            final Promise<Optional<LogRecordWithDLSN>> promise) {
+        final List<Future<LogRecordWithDLSN>> searchResults =
+                Lists.newArrayListWithExpectedSize(entriesToSearch.size());
+        for (Long entryId : entriesToSearch) {
+            LogRecordSelector selector = new FirstTxIdNotLessThanSelector(transactionId);
+            Future<LogRecordWithDLSN> searchResult = asyncReadRecordFromEntries(
+                    logName,
+                    reader,
+                    segment,
+                    executorService,
+                    new SingleEntryScanContext(entryId),
+                    selector);
+            searchResults.add(searchResult);
+        }
+        FutureEventListener<List<LogRecordWithDLSN>> processSearchResultsListener =
+                new FutureEventListener<List<LogRecordWithDLSN>>() {
+                    @Override
+                    public void onSuccess(List<LogRecordWithDLSN> resultList) {
+                        processSearchResults(
+                                logName,
+                                segment,
+                                transactionId,
+                                executorService,
+                                reader,
+                                resultList,
+                                nWays,
+                                prevFoundRecord,
+                                promise);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        promise.setException(cause);
+                    }
+                };
+        Future.collect(searchResults).addEventListener(
+                FutureEventListenerRunnable.of(processSearchResultsListener, executorService));
+    }
+
+    /**
+     * Process the search results
+     */
+    static void processSearchResults(
+            final String logName,
+            final LogSegmentMetadata segment,
+            final long transactionId,
+            final ExecutorService executorService,
+            final LogSegmentRandomAccessEntryReader reader,
+            final List<LogRecordWithDLSN> searchResults,
+            final int nWays,
+            final Optional<LogRecordWithDLSN> prevFoundRecord,
+            final Promise<Optional<LogRecordWithDLSN>> promise) {
+        int found = -1;
+        for (int i = 0; i < searchResults.size(); i++) {
+            LogRecordWithDLSN record = searchResults.get(i);
+            if (record.getTransactionId() >= transactionId) {
+                found = i;
+                break;
+            }
+        }
+        if (found == -1) { // all log records' transaction id is less than provided transaction id
+            promise.setValue(prevFoundRecord);
+            return;
+        }
+        // we found a log record
+        LogRecordWithDLSN foundRecord = searchResults.get(found);
+
+        // we found it
+        //   - it is not the first record
+        //   - it is the first record in first search entry
+        //   - its entry is adjacent to previous search entry
+        if (foundRecord.getDlsn().getSlotId() != 0L
+                || found == 0
+                || foundRecord.getDlsn().getEntryId() == (searchResults.get(found - 1).getDlsn().getEntryId() + 1)) {
+            promise.setValue(Optional.of(foundRecord));
+            return;
+        }
+
+        // otherwise, we need to search
+        List<Long> nextSearchBatch = getEntriesToSearch(
+                transactionId,
+                searchResults.get(found - 1),
+                searchResults.get(found),
+                nWays);
+        if (nextSearchBatch.isEmpty()) {
+            promise.setValue(prevFoundRecord);
+            return;
+        }
+        getLogRecordNotLessThanTxIdFromEntries(
+                logName,
+                segment,
+                transactionId,
+                executorService,
+                reader,
+                nextSearchBatch,
+                nWays,
+                Optional.of(foundRecord),
+                promise);
+    }
+
+    /**
+     * Get the entries to search provided <code>transactionId</code> between
+     * <code>firstRecord</code> and <code>lastRecord</code>. <code>firstRecord</code>
+     * and <code>lastRecord</code> are already searched, which the transaction id
+     * of <code>firstRecord</code> is less than <code>transactionId</code> and the
+     * transaction id of <code>lastRecord</code> is not less than <code>transactionId</code>.
+     *
+     * @param transactionId
+     *          transaction id to search
+     * @param firstRecord
+     *          log record that already searched whose transaction id is leass than <code>transactionId</code>.
+     * @param lastRecord
+     *          log record that already searched whose transaction id is not less than <code>transactionId</code>.
+     * @param nWays
+     *          N-ways to search
+     * @return the list of entries to search
+     */
+    static List<Long> getEntriesToSearch(
+            long transactionId,
+            LogRecordWithDLSN firstRecord,
+            LogRecordWithDLSN lastRecord,
+            int nWays) {
+        long txnDiff = lastRecord.getTransactionId() - firstRecord.getTransactionId();
+        if (txnDiff > 0) {
+            if (lastRecord.getTransactionId() == transactionId) {
+                List<Long> entries = getEntriesToSearch(
+                        firstRecord.getDlsn().getEntryId() + 1,
+                        lastRecord.getDlsn().getEntryId() - 2,
+                        Math.max(MIN_SEARCH_BATCH_SIZE, nWays - 1));
+                entries.add(lastRecord.getDlsn().getEntryId() - 1);
+                return entries;
+            } else {
+                // TODO: improve it by estimating transaction ids.
+                return getEntriesToSearch(
+                        firstRecord.getDlsn().getEntryId() + 1,
+                        lastRecord.getDlsn().getEntryId() - 1,
+                        nWays);
+            }
+        } else {
+            // unexpected condition
+            return Lists.newArrayList();
+        }
+    }
+
+    static List<Long> getEntriesToSearch(
+            long startEntryId,
+            long endEntryId,
+            int nWays) {
+        if (startEntryId > endEntryId) {
+            return Lists.newArrayList();
+        }
+        long numEntries = endEntryId - startEntryId + 1;
+        long step = Math.max(1L, numEntries / nWays);
+        List<Long> entryList = Lists.newArrayListWithExpectedSize(nWays);
+        for (long i = startEntryId, j = nWays - 1; i <= endEntryId && j > 0; i += step, j--) {
+            entryList.add(i);
+        }
+        if (entryList.get(entryList.size() - 1) < endEntryId) {
+            entryList.add(endEntryId);
+        }
+        return entryList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java
new file mode 100644
index 0000000..d25d056
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.util.PermitLimiter;
+
+public class WriteLimiter {
+
+    String streamName;
+    final PermitLimiter streamLimiter;
+    final PermitLimiter globalLimiter;
+
+    public WriteLimiter(String streamName, PermitLimiter streamLimiter, PermitLimiter globalLimiter) {
+        this.streamName = streamName;
+        this.streamLimiter = streamLimiter;
+        this.globalLimiter = globalLimiter;
+    }
+
+    public void acquire() throws OverCapacityException {
+        if (!streamLimiter.acquire()) {
+            throw new OverCapacityException(String.format("Stream write capacity exceeded for stream %s", streamName));
+        }
+        try {
+            if (!globalLimiter.acquire()) {
+                throw new OverCapacityException("Global write capacity exceeded");
+            }
+        } catch (OverCapacityException ex) {
+            streamLimiter.release(1);
+            throw ex;
+        }
+    }
+
+    public void release() {
+        release(1);
+    }
+
+    public void release(int permits) {
+        streamLimiter.release(permits);
+        globalLimiter.release(permits);
+    }
+
+    public void close() {
+        streamLimiter.close();
+        globalLimiter.close();
+    }
+}