You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:08 UTC
[03/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
new file mode 100644
index 0000000..0b8c55a
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
@@ -0,0 +1,992 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.collect.Lists;
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
+import org.apache.distributedlog.exceptions.DLIllegalStateException;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.exceptions.EndOfLogSegmentException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.io.AsyncCloseable;
+import org.apache.distributedlog.logsegment.LogSegmentEntryReader;
+import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
+import org.apache.distributedlog.logsegment.LogSegmentFilter;
+import org.apache.distributedlog.util.OrderedScheduler;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Futures;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Function1;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * New ReadAhead Reader that uses {@link org.apache.distributedlog.logsegment.LogSegmentEntryReader}.
+ *
+ * NOTE: all the state changes happen in the same thread. All *unsafe* methods should be submitted to the order
+ * scheduler using stream name as the key.
+ */
+public class ReadAheadEntryReader implements
+ AsyncCloseable,
+ LogSegmentListener,
+ LogSegmentEntryReader.StateChangeListener,
+ FutureEventListener<List<Entry.Reader>> {
+
+ private static final Logger logger = LoggerFactory.getLogger(ReadAheadEntryReader.class);
+
+ //
+ // Static Functions
+ //
+
+ private static AbstractFunction1<LogSegmentEntryReader, BoxedUnit> START_READER_FUNC = new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(LogSegmentEntryReader reader) {
+ reader.start();
+ return BoxedUnit.UNIT;
+ }
+ };
+
+ //
+ // Internal Classes
+ //
+
+ class SegmentReader implements FutureEventListener<LogSegmentEntryReader> {
+
+ private LogSegmentMetadata metadata;
+ private final long startEntryId;
+ private Future<LogSegmentEntryReader> openFuture = null;
+ private LogSegmentEntryReader reader = null;
+ private boolean isStarted = false;
+ private boolean isClosed = false;
+
+ SegmentReader(LogSegmentMetadata metadata,
+ long startEntryId) {
+ this.metadata = metadata;
+ this.startEntryId = startEntryId;
+ }
+
+ synchronized LogSegmentEntryReader getEntryReader() {
+ return reader;
+ }
+
+ synchronized boolean isBeyondLastAddConfirmed() {
+ return null != reader && reader.isBeyondLastAddConfirmed();
+ }
+
+ synchronized LogSegmentMetadata getSegment() {
+ return metadata;
+ }
+
+ synchronized boolean isReaderOpen() {
+ return null != openFuture;
+ }
+
+ synchronized void openReader() {
+ if (null != openFuture) {
+ return;
+ }
+ openFuture = entryStore.openReader(metadata, startEntryId).addEventListener(this);
+ }
+
+ synchronized boolean isReaderStarted() {
+ return isStarted;
+ }
+
+ synchronized void startRead() {
+ if (isStarted) {
+ return;
+ }
+ isStarted = true;
+ if (null != reader) {
+ reader.start();
+ } else {
+ openFuture.onSuccess(START_READER_FUNC);
+ }
+ }
+
+ synchronized Future<List<Entry.Reader>> readNext() {
+ if (null != reader) {
+ checkCatchingUpStatus(reader);
+ return reader.readNext(numReadAheadEntries);
+ } else {
+ return openFuture.flatMap(readFunc);
+ }
+ }
+
+ synchronized void updateLogSegmentMetadata(final LogSegmentMetadata segment) {
+ if (null != reader) {
+ reader.onLogSegmentMetadataUpdated(segment);
+ this.metadata = segment;
+ } else {
+ openFuture.onSuccess(new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(LogSegmentEntryReader reader) {
+ reader.onLogSegmentMetadataUpdated(segment);
+ synchronized (SegmentReader.this) {
+ SegmentReader.this.metadata = segment;
+ }
+ return BoxedUnit.UNIT;
+ }
+ });
+ }
+ }
+
+ @Override
+ synchronized public void onSuccess(LogSegmentEntryReader reader) {
+ this.reader = reader;
+ if (reader.getSegment().isInProgress()) {
+ reader.registerListener(ReadAheadEntryReader.this);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ // no-op, the failure will be propagated on first read.
+ }
+
+ synchronized boolean isClosed() {
+ return isClosed;
+ }
+
+ synchronized Future<Void> close() {
+ if (null == openFuture) {
+ return Future.Void();
+ }
+ return openFuture.flatMap(new AbstractFunction1<LogSegmentEntryReader, Future<Void>>() {
+ @Override
+ public Future<Void> apply(LogSegmentEntryReader reader) {
+ return reader.asyncClose();
+ }
+ }).ensure(new Function0<BoxedUnit>() {
+ @Override
+ public BoxedUnit apply() {
+ synchronized (SegmentReader.this) {
+ isClosed = true;
+ }
+ return null;
+ }
+ });
+ }
+ }
+
+ private class ReadEntriesFunc extends AbstractFunction1<LogSegmentEntryReader, Future<List<Entry.Reader>>> {
+
+ private final int numEntries;
+
+ ReadEntriesFunc(int numEntries) {
+ this.numEntries = numEntries;
+ }
+
+ @Override
+ public Future<List<Entry.Reader>> apply(LogSegmentEntryReader reader) {
+ checkCatchingUpStatus(reader);
+ return reader.readNext(numEntries);
+ }
+ }
+
+ private abstract class CloseableRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ synchronized (ReadAheadEntryReader.this) {
+ if (null != closePromise) {
+ return;
+ }
+ }
+ try {
+ safeRun();
+ } catch (Throwable cause) {
+ logger.error("Caught unexpected exception : ", cause);
+ }
+ }
+
+ abstract void safeRun();
+
+ }
+
+ //
+ // Functions
+ //
+ private final Function1<LogSegmentEntryReader, Future<List<Entry.Reader>>> readFunc;
+ private final Function0<BoxedUnit> removeClosedSegmentReadersFunc = new Function0<BoxedUnit>() {
+ @Override
+ public BoxedUnit apply() {
+ removeClosedSegmentReaders();
+ return BoxedUnit.UNIT;
+ }
+ };
+
+ //
+ // Resources
+ //
+ private final DistributedLogConfiguration conf;
+ private final BKLogReadHandler readHandler;
+ private final LogSegmentEntryStore entryStore;
+ private final OrderedScheduler scheduler;
+
+ //
+ // Parameters
+ //
+ private final String streamName;
+ private final DLSN fromDLSN;
+ private final int maxCachedEntries;
+ private final int numReadAheadEntries;
+ private final int idleWarnThresholdMillis;
+
+ //
+ // Cache
+ //
+ private final LinkedBlockingQueue<Entry.Reader> entryQueue;
+
+ //
+ // State of the reader
+ //
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private boolean isInitialized = false;
+ private boolean readAheadPaused = false;
+ private Promise<Void> closePromise = null;
+ // segment readers
+ private long currentSegmentSequenceNumber;
+ private SegmentReader currentSegmentReader;
+ private SegmentReader nextSegmentReader;
+ private DLSN lastDLSN;
+ private final EntryPosition nextEntryPosition;
+ private volatile boolean isCatchingUp = true;
+ private final LinkedList<SegmentReader> segmentReaders;
+ private final LinkedList<SegmentReader> segmentReadersToClose;
+ // last exception that this reader encounters
+ private final AtomicReference<IOException> lastException = new AtomicReference<IOException>(null);
+ // last entry added time
+ private final Stopwatch lastEntryAddedTime;
+ // state change notification
+ private final CopyOnWriteArraySet<AsyncNotification> stateChangeNotifications =
+ new CopyOnWriteArraySet<AsyncNotification>();
+ // idle reader check task
+ private final ScheduledFuture<?> idleReaderCheckTask;
+
+ //
+ // Stats
+ //
+ private final AlertStatsLogger alertStatsLogger;
+
+ public ReadAheadEntryReader(String streamName,
+ DLSN fromDLSN,
+ DistributedLogConfiguration conf,
+ BKLogReadHandler readHandler,
+ LogSegmentEntryStore entryStore,
+ OrderedScheduler scheduler,
+ Ticker ticker,
+ AlertStatsLogger alertStatsLogger) {
+ this.streamName = streamName;
+ this.fromDLSN = lastDLSN = fromDLSN;
+ this.nextEntryPosition = new EntryPosition(
+ fromDLSN.getLogSegmentSequenceNo(),
+ fromDLSN.getEntryId());
+ this.conf = conf;
+ this.maxCachedEntries = conf.getReadAheadMaxRecords();
+ this.numReadAheadEntries = conf.getReadAheadBatchSize();
+ this.idleWarnThresholdMillis = conf.getReaderIdleWarnThresholdMillis();
+ this.readHandler = readHandler;
+ this.entryStore = entryStore;
+ this.scheduler = scheduler;
+ this.readFunc = new ReadEntriesFunc(numReadAheadEntries);
+ this.alertStatsLogger = alertStatsLogger;
+
+ // create the segment reader list
+ this.segmentReaders = new LinkedList<SegmentReader>();
+ this.segmentReadersToClose = new LinkedList<SegmentReader>();
+ // create the readahead entry queue
+ this.entryQueue = new LinkedBlockingQueue<Entry.Reader>();
+
+ // start the idle reader detection
+ lastEntryAddedTime = Stopwatch.createStarted(ticker);
+ // start the idle reader check task
+ idleReaderCheckTask = scheduleIdleReaderTaskIfNecessary();
+ }
+
+ private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
+ if (idleWarnThresholdMillis < Integer.MAX_VALUE && idleWarnThresholdMillis > 0) {
+ return scheduler.scheduleAtFixedRate(streamName, new Runnable() {
+ @Override
+ public void run() {
+ if (!isReaderIdle(idleWarnThresholdMillis, TimeUnit.MILLISECONDS)) {
+ return;
+ }
+ // the readahead has been idle
+ unsafeCheckIfReadAheadIsIdle();
+ }
+ }, idleWarnThresholdMillis, idleWarnThresholdMillis, TimeUnit.MILLISECONDS);
+ }
+ return null;
+ }
+
+ private void unsafeCheckIfReadAheadIsIdle() {
+ boolean forceReadLogSegments =
+ (null == currentSegmentReader) || currentSegmentReader.isBeyondLastAddConfirmed();
+ if (forceReadLogSegments) {
+ readHandler.readLogSegmentsFromStore(
+ LogSegmentMetadata.COMPARATOR,
+ LogSegmentFilter.DEFAULT_FILTER,
+ null
+ ).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+ @Override
+ public void onFailure(Throwable cause) {
+ // do nothing here since it would be retried on next idle reader check task
+ }
+
+ @Override
+ public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) {
+ onSegmentsUpdated(segments.getValue());
+ }
+ });
+ }
+ }
+
+ private void cancelIdleReaderTask() {
+ if (null != idleReaderCheckTask) {
+ idleReaderCheckTask.cancel(true);
+ }
+ }
+
+ @VisibleForTesting
+ EntryPosition getNextEntryPosition() {
+ return nextEntryPosition;
+ }
+
+ @VisibleForTesting
+ SegmentReader getCurrentSegmentReader() {
+ return currentSegmentReader;
+ }
+
+ @VisibleForTesting
+ long getCurrentSegmentSequenceNumber() {
+ return currentSegmentSequenceNumber;
+ }
+
+ @VisibleForTesting
+ SegmentReader getNextSegmentReader() {
+ return nextSegmentReader;
+ }
+
+ @VisibleForTesting
+ LinkedList<SegmentReader> getSegmentReaders() {
+ return segmentReaders;
+ }
+
+ @VisibleForTesting
+ boolean isInitialized() {
+ return isInitialized;
+ }
+
+ private void orderedSubmit(Runnable runnable) {
+ synchronized (this) {
+ if (null != closePromise) {
+ return;
+ }
+ }
+ try {
+ scheduler.submit(streamName, runnable);
+ } catch (RejectedExecutionException ree) {
+ logger.debug("Failed to submit and execute an operation for readhead entry reader of {}",
+ streamName, ree);
+ }
+ }
+
+ public void start(final List<LogSegmentMetadata> segmentList) {
+ logger.info("Starting the readahead entry reader for {} : segments = {}",
+ readHandler.getFullyQualifiedName(), segmentList);
+ started.set(true);
+ processLogSegments(segmentList);
+ }
+
+ private void removeClosedSegmentReaders() {
+ orderedSubmit(new CloseableRunnable() {
+ @Override
+ void safeRun() {
+ unsafeRemoveClosedSegmentReaders();
+ }
+ });
+ }
+
+ private void unsafeRemoveClosedSegmentReaders() {
+ SegmentReader reader = segmentReadersToClose.peekFirst();
+ while (null != reader) {
+ if (reader.isClosed()) {
+ segmentReadersToClose.pollFirst();
+ reader = segmentReadersToClose.peekFirst();
+ } else {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public Future<Void> asyncClose() {
+ final Promise<Void> closeFuture;
+ synchronized (this) {
+ if (null != closePromise) {
+ return closePromise;
+ }
+ closePromise = closeFuture = new Promise<Void>();
+ }
+
+ // cancel the idle reader task
+ cancelIdleReaderTask();
+
+ // use runnable here instead of CloseableRunnable,
+ // because we need this to be executed
+ try {
+ scheduler.submit(streamName, new Runnable() {
+ @Override
+ public void run() {
+ unsafeAsyncClose(closeFuture);
+ }
+ });
+ } catch (RejectedExecutionException ree) {
+ logger.warn("Scheduler has been shutdown before closing the readahead entry reader for stream {}",
+ streamName, ree);
+ unsafeAsyncClose(closeFuture);
+ }
+
+ return closeFuture;
+ }
+
+ private void unsafeAsyncClose(Promise<Void> closePromise) {
+ List<Future<Void>> closeFutures = Lists.newArrayListWithExpectedSize(
+ segmentReaders.size() + segmentReadersToClose.size() + 1);
+ if (null != currentSegmentReader) {
+ segmentReadersToClose.add(currentSegmentReader);
+ }
+ if (null != nextSegmentReader) {
+ segmentReadersToClose.add(nextSegmentReader);
+ }
+ for (SegmentReader reader : segmentReaders) {
+ segmentReadersToClose.add(reader);
+ }
+ segmentReaders.clear();
+ for (SegmentReader reader : segmentReadersToClose) {
+ closeFutures.add(reader.close());
+ }
+ Futures.collect(closeFutures).proxyTo(closePromise);
+ }
+
+ //
+ // Reader State Changes
+ //
+
+ ReadAheadEntryReader addStateChangeNotification(AsyncNotification notification) {
+ this.stateChangeNotifications.add(notification);
+ return this;
+ }
+
+ ReadAheadEntryReader removeStateChangeNotification(AsyncNotification notification) {
+ this.stateChangeNotifications.remove(notification);
+ return this;
+ }
+
+ private void notifyStateChangeOnSuccess() {
+ for (AsyncNotification notification : stateChangeNotifications) {
+ notification.notifyOnOperationComplete();
+ }
+ }
+
+ private void notifyStateChangeOnFailure(Throwable cause) {
+ for (AsyncNotification notification : stateChangeNotifications) {
+ notification.notifyOnError(cause);
+ }
+ }
+
+ void setLastException(IOException cause) {
+ if (!lastException.compareAndSet(null, cause)) {
+ logger.debug("last exception has already been set to ", lastException.get());
+ }
+ // the exception is set and notify the state change
+ notifyStateChangeOnFailure(cause);
+ }
+
+ void checkLastException() throws IOException {
+ if (null != lastException.get()) {
+ throw lastException.get();
+ }
+ }
+
+ void checkCatchingUpStatus(LogSegmentEntryReader reader) {
+ if (reader.getSegment().isInProgress()
+ && isCatchingUp
+ && reader.hasCaughtUpOnInprogress()) {
+ logger.info("ReadAhead for {} is caught up at entry {} @ log segment {}.",
+ new Object[] { readHandler.getFullyQualifiedName(),
+ reader.getLastAddConfirmed(), reader.getSegment() });
+ isCatchingUp = false;
+ }
+ }
+
+ void markCaughtup() {
+ if (isCatchingUp) {
+ isCatchingUp = false;
+ logger.info("ReadAhead for {} is caught up", readHandler.getFullyQualifiedName());
+ }
+ }
+
+ public boolean isReadAheadCaughtUp() {
+ return !isCatchingUp;
+ }
+
+ @Override
+ public void onCaughtupOnInprogress() {
+ markCaughtup();
+ }
+
+ //
+ // ReadAhead State Machine
+ //
+
+ @Override
+ public void onSuccess(List<Entry.Reader> entries) {
+ lastEntryAddedTime.reset().start();
+ for (Entry.Reader entry : entries) {
+ entryQueue.add(entry);
+ }
+ if (!entries.isEmpty()) {
+ Entry.Reader lastEntry = entries.get(entries.size() - 1);
+ nextEntryPosition.advance(lastEntry.getLSSN(), lastEntry.getEntryId() + 1);
+ }
+ // notify on data available
+ notifyStateChangeOnSuccess();
+ if (entryQueue.size() >= maxCachedEntries) {
+ pauseReadAheadOnCacheFull();
+ } else {
+ scheduleReadNext();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ if (cause instanceof EndOfLogSegmentException) {
+ // we reach end of the log segment
+ moveToNextLogSegment();
+ return;
+ }
+ if (cause instanceof IOException) {
+ setLastException((IOException) cause);
+ } else {
+ setLastException(new UnexpectedException("Unexpected non I/O exception", cause));
+ }
+ }
+
+ private synchronized void invokeReadAhead() {
+ if (readAheadPaused) {
+ scheduleReadNext();
+ readAheadPaused = false;
+ }
+ }
+
+ private synchronized void pauseReadAheadOnCacheFull() {
+ this.readAheadPaused = true;
+ if (!isCacheFull()) {
+ invokeReadAhead();
+ }
+ }
+
+ private synchronized void pauseReadAheadOnNoMoreLogSegments() {
+ this.readAheadPaused = true;
+ }
+
+ //
+ // Cache Related Methods
+ //
+
+ public Entry.Reader getNextReadAheadEntry(long waitTime, TimeUnit waitTimeUnit) throws IOException {
+ if (null != lastException.get()) {
+ throw lastException.get();
+ }
+ Entry.Reader entry;
+ try {
+ entry = entryQueue.poll(waitTime, waitTimeUnit);
+ } catch (InterruptedException e) {
+ throw new DLInterruptedException("Interrupted on waiting next readahead entry : ", e);
+ }
+ try {
+ return entry;
+ } finally {
+ // resume readahead if the cache becomes empty
+ if (null != entry && !isCacheFull()) {
+ invokeReadAhead();
+ }
+ }
+ }
+
+ /**
+ * Return number cached entries.
+ *
+ * @return number cached entries.
+ */
+ public int getNumCachedEntries() {
+ return entryQueue.size();
+ }
+
+ /**
+ * Return if the cache is full.
+ *
+ * @return true if the cache is full, otherwise false.
+ */
+ public boolean isCacheFull() {
+ return getNumCachedEntries() >= maxCachedEntries;
+ }
+
+ @VisibleForTesting
+ public boolean isCacheEmpty() {
+ return entryQueue.isEmpty();
+ }
+
+ /**
+ * Check whether the readahead becomes stall.
+ *
+ * @param idleReaderErrorThreshold idle reader error threshold
+ * @param timeUnit time unit of the idle reader error threshold
+ * @return true if the readahead becomes stall, otherwise false.
+ */
+ public boolean isReaderIdle(int idleReaderErrorThreshold, TimeUnit timeUnit) {
+ return (lastEntryAddedTime.elapsed(timeUnit) > idleReaderErrorThreshold);
+ }
+
+ //
+ // LogSegment Management
+ //
+
+ void processLogSegments(final List<LogSegmentMetadata> segments) {
+ orderedSubmit(new CloseableRunnable() {
+ @Override
+ void safeRun() {
+ unsafeProcessLogSegments(segments);
+ }
+ });
+ }
+
+ private void unsafeProcessLogSegments(List<LogSegmentMetadata> segments) {
+ if (isInitialized) {
+ unsafeReinitializeLogSegments(segments);
+ } else {
+ unsafeInitializeLogSegments(segments);
+ }
+ }
+
+ /**
+ * Update the log segment metadata.
+ *
+ * @param reader the reader to update the metadata
+ * @param newMetadata the new metadata received
+ * @return true if successfully, false on encountering errors
+ */
+ private boolean updateLogSegmentMetadata(SegmentReader reader,
+ LogSegmentMetadata newMetadata) {
+ if (reader.getSegment().getLogSegmentSequenceNumber() != newMetadata.getLogSegmentSequenceNumber()) {
+ setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for "
+ + streamName + " : current segment = " + reader.getSegment() + ", new segment = " + newMetadata));
+ return false;
+ }
+ if (!reader.getSegment().isInProgress() && newMetadata.isInProgress()) {
+ setLastException(new DLIllegalStateException("An inprogress log segment " + newMetadata
+ + " received after a closed log segment " + reader.getSegment() + " on reading segment "
+ + newMetadata.getLogSegmentSequenceNumber() + " @ stream " + streamName));
+ return false;
+ }
+ if (reader.getSegment().isInProgress() && !newMetadata.isInProgress()) {
+ reader.updateLogSegmentMetadata(newMetadata);
+ }
+ return true;
+ }
+
+ /**
+ * Reinitialize the log segments
+ */
+ private void unsafeReinitializeLogSegments(List<LogSegmentMetadata> segments) {
+ logger.info("Reinitialize log segments with {}", segments);
+ int segmentIdx = 0;
+ for (; segmentIdx < segments.size(); segmentIdx++) {
+ LogSegmentMetadata segment = segments.get(segmentIdx);
+ if (segment.getLogSegmentSequenceNumber() < currentSegmentSequenceNumber) {
+ continue;
+ }
+ break;
+ }
+ if (segmentIdx >= segments.size()) {
+ return;
+ }
+ LogSegmentMetadata segment = segments.get(segmentIdx);
+ if (null != currentSegmentReader) {
+ if (!updateLogSegmentMetadata(currentSegmentReader, segment)) {
+ return;
+ }
+ } else {
+ if (currentSegmentSequenceNumber != segment.getLogSegmentSequenceNumber()) {
+ setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for "
+ + streamName + " : current segment sn = " + currentSegmentSequenceNumber
+ + ", new segment sn = " + segment.getLogSegmentSequenceNumber()));
+ return;
+ }
+ }
+ segmentIdx++;
+ if (segmentIdx >= segments.size()) {
+ return;
+ }
+ // check next segment
+ segment = segments.get(segmentIdx);
+ if (null != nextSegmentReader) {
+ if (!updateLogSegmentMetadata(nextSegmentReader, segment)) {
+ return;
+ }
+ segmentIdx++;
+ }
+ // check the segment readers in the queue
+ for (int readerIdx = 0;
+ readerIdx < segmentReaders.size() && segmentIdx < segments.size();
+ readerIdx++, segmentIdx++) {
+ SegmentReader reader = segmentReaders.get(readerIdx);
+ segment = segments.get(segmentIdx);
+ if (!updateLogSegmentMetadata(reader, segment)) {
+ return;
+ }
+ }
+ // add the remaining segments to the reader queue
+ for (; segmentIdx < segments.size(); segmentIdx++) {
+ segment = segments.get(segmentIdx);
+ SegmentReader reader = new SegmentReader(segment, 0L);
+ reader.openReader();
+ segmentReaders.add(reader);
+ }
+ if (null == currentSegmentReader) {
+ unsafeMoveToNextLogSegment();
+ }
+ // resume readahead if necessary
+ invokeReadAhead();
+ }
+
+ /**
+ * Initialize the reader with the log <i>segments</i>.
+ *
+ * @param segments list of log segments
+ */
+ private void unsafeInitializeLogSegments(List<LogSegmentMetadata> segments) {
+ if (segments.isEmpty()) {
+ // not initialize the background reader, until the first log segment is notified
+ return;
+ }
+ boolean skipTruncatedLogSegments = true;
+ DLSN dlsnToStart = fromDLSN;
+ // positioning the reader
+ for (int i = 0; i < segments.size(); i++) {
+ LogSegmentMetadata segment = segments.get(i);
+ // skip any log segments that have smaller log segment sequence numbers
+ if (segment.getLogSegmentSequenceNumber() < fromDLSN.getLogSegmentSequenceNo()) {
+ continue;
+ }
+ // if the log segment is truncated, skip it.
+ if (skipTruncatedLogSegments &&
+ !conf.getIgnoreTruncationStatus() &&
+ segment.isTruncated()) {
+ continue;
+ }
+ // if the log segment is partially truncated, move the start dlsn to the min active dlsn
+ if (skipTruncatedLogSegments &&
+ !conf.getIgnoreTruncationStatus() &&
+ segment.isPartiallyTruncated()) {
+ if (segment.getMinActiveDLSN().compareTo(fromDLSN) > 0) {
+ dlsnToStart = segment.getMinActiveDLSN();
+ }
+ }
+ skipTruncatedLogSegments = false;
+ if (!isAllowedToPosition(segment, dlsnToStart)) {
+ logger.error("segment {} is not allowed to position at {}", segment, dlsnToStart);
+ return;
+ }
+
+ SegmentReader reader = new SegmentReader(segment,
+ segment.getLogSegmentSequenceNumber() == dlsnToStart.getLogSegmentSequenceNo()
+ ? dlsnToStart.getEntryId() : 0L);
+ segmentReaders.add(reader);
+ }
+ if (segmentReaders.isEmpty()) {
+ // not initialize the background reader, until the first log segment is available to read
+ return;
+ }
+ currentSegmentReader = segmentReaders.pollFirst();
+ currentSegmentReader.openReader();
+ currentSegmentReader.startRead();
+ currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
+ unsafeReadNext(currentSegmentReader);
+ if (!segmentReaders.isEmpty()) {
+ for (SegmentReader reader : segmentReaders) {
+ reader.openReader();
+ }
+ unsafePrefetchNextSegment(true);
+ }
+ // mark the reader initialized
+ isInitialized = true;
+ }
+
+ private void unsafePrefetchNextSegment(boolean onlyInprogressLogSegment) {
+ SegmentReader nextReader = segmentReaders.peekFirst();
+ // open the next log segment if it is inprogress
+ if (null != nextReader) {
+ if (onlyInprogressLogSegment && !nextReader.getSegment().isInProgress()) {
+ return;
+ }
+ nextReader.startRead();
+ nextSegmentReader = nextReader;
+ segmentReaders.pollFirst();
+ }
+ }
+
+ /**
+ * Check if we are allowed to position the reader at <i>fromDLSN</i>.
+ *
+ * @return true if it is allowed, otherwise false.
+ */
+ private boolean isAllowedToPosition(LogSegmentMetadata segment, DLSN fromDLSN) {
+ if (segment.isTruncated()
+ && segment.getLastDLSN().compareTo(fromDLSN) >= 0
+ && !conf.getIgnoreTruncationStatus()) {
+ setLastException(new AlreadyTruncatedTransactionException(streamName
+ + " : trying to position read ahead at " + fromDLSN
+ + " on a segment " + segment + " that is already marked as truncated"));
+ return false;
+ }
+ if (segment.isPartiallyTruncated() &&
+ segment.getMinActiveDLSN().compareTo(fromDLSN) > 0) {
+ if (conf.getAlertWhenPositioningOnTruncated()) {
+ alertStatsLogger.raise("Trying to position reader on {} when {} is marked partially truncated",
+ fromDLSN, segment);
+ }
+ if (!conf.getIgnoreTruncationStatus()) {
+ logger.error("{}: Trying to position reader on {} when {} is marked partially truncated",
+ new Object[]{ streamName, fromDLSN, segment });
+
+ setLastException(new AlreadyTruncatedTransactionException(streamName
+ + " : trying to position read ahead at " + fromDLSN
+ + " on a segment " + segment + " that is already marked as truncated"));
+ return false;
+ }
+ }
+ return true;
+ }
+
+ void moveToNextLogSegment() {
+ orderedSubmit(new CloseableRunnable() {
+ @Override
+ void safeRun() {
+ unsafeMoveToNextLogSegment();
+ }
+ });
+ }
+
+ private void unsafeMoveToNextLogSegment() {
+ if (null != currentSegmentReader) {
+ segmentReadersToClose.add(currentSegmentReader);
+ currentSegmentReader.close().ensure(removeClosedSegmentReadersFunc);
+ logger.debug("close current segment reader {}", currentSegmentReader.getSegment());
+ currentSegmentReader = null;
+ }
+ boolean hasSegmentToRead = false;
+ if (null != nextSegmentReader) {
+ currentSegmentReader = nextSegmentReader;
+ logger.debug("move to read segment {}", currentSegmentReader.getSegment());
+ currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
+ nextSegmentReader = null;
+ // start reading
+ unsafeReadNext(currentSegmentReader);
+ unsafePrefetchNextSegment(true);
+ hasSegmentToRead = true;
+ } else {
+ unsafePrefetchNextSegment(false);
+ if (null != nextSegmentReader) {
+ currentSegmentReader = nextSegmentReader;
+ logger.debug("move to read segment {}", currentSegmentReader.getSegment());
+ currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
+ nextSegmentReader = null;
+ unsafeReadNext(currentSegmentReader);
+ unsafePrefetchNextSegment(true);
+ hasSegmentToRead = true;
+ }
+ }
+ if (!hasSegmentToRead) { // no more segment to read, wait until new log segment arrive
+ if (isCatchingUp) {
+ logger.info("ReadAhead for {} is caught up and no log segments to read now",
+ readHandler.getFullyQualifiedName());
+ isCatchingUp = false;
+ }
+ pauseReadAheadOnNoMoreLogSegments();
+ }
+ }
+
+ void scheduleReadNext() {
+ orderedSubmit(new CloseableRunnable() {
+ @Override
+ void safeRun() {
+ if (null == currentSegmentReader) {
+ pauseReadAheadOnNoMoreLogSegments();
+ return;
+ }
+ unsafeReadNext(currentSegmentReader);
+ }
+ });
+ }
+
+ private void unsafeReadNext(SegmentReader reader) {
+ reader.readNext().addEventListener(this);
+ }
+
+ @Override
+ public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
+ if (!started.get()) {
+ return;
+ }
+ logger.info("segments is updated with {}", segments);
+ processLogSegments(segments);
+ }
+
+ @Override
+ public void onLogStreamDeleted() {
+ setLastException(new LogNotFoundException("Log stream "
+ + streamName + " is deleted"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java
new file mode 100644
index 0000000..9935d5f
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java
@@ -0,0 +1,782 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
+import org.apache.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import org.apache.distributedlog.selector.FirstDLSNNotLessThanSelector;
+import org.apache.distributedlog.selector.FirstTxIdNotLessThanSelector;
+import org.apache.distributedlog.selector.LastRecordSelector;
+import org.apache.distributedlog.selector.LogRecordSelector;
+import org.apache.distributedlog.util.FutureUtils.FutureEventListenerRunnable;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction0;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Utility function for readers
+ */
+public class ReadUtils {
+
+ static final Logger LOG = LoggerFactory.getLogger(ReadUtils.class);
+
+ private static final int MIN_SEARCH_BATCH_SIZE = 2;
+
+ //
+ // Read First & Last Record Functions
+ //
+
+ /**
+ * Read last record from a log segment.
+ *
+ * @param streamName
+ * fully qualified stream name (used for logging)
+ * @param l
+ * log segment metadata.
+ * @param fence
+ * whether to fence the log segment.
+ * @param includeControl
+ * whether to include control record.
+ * @param includeEndOfStream
+ * whether to include end of stream.
+ * @param scanStartBatchSize
+ * first num entries used for read last record scan
+ * @param scanMaxBatchSize
+ * max num entries used for read last record scan
+ * @param numRecordsScanned
+ * num of records scanned to get last record
+ * @param executorService
+ * executor service used for processing entries
+ * @param entryStore
+ * log segment entry store
+ * @return a future with last record.
+ */
+ public static Future<LogRecordWithDLSN> asyncReadLastRecord(
+ final String streamName,
+ final LogSegmentMetadata l,
+ final boolean fence,
+ final boolean includeControl,
+ final boolean includeEndOfStream,
+ final int scanStartBatchSize,
+ final int scanMaxBatchSize,
+ final AtomicInteger numRecordsScanned,
+ final ExecutorService executorService,
+ final LogSegmentEntryStore entryStore) {
+ final LogRecordSelector selector = new LastRecordSelector();
+ return asyncReadRecord(streamName, l, fence, includeControl, includeEndOfStream, scanStartBatchSize,
+ scanMaxBatchSize, numRecordsScanned, executorService, entryStore,
+ selector, true /* backward */, 0L);
+ }
+
+ /**
+ * Read first record from a log segment with a DLSN larger than that given.
+ *
+ * @param streamName
+ * fully qualified stream name (used for logging)
+ * @param l
+ * log segment metadata.
+ * @param scanStartBatchSize
+ * first num entries used for read last record scan
+ * @param scanMaxBatchSize
+ * max num entries used for read last record scan
+ * @param numRecordsScanned
+ * num of records scanned to get last record
+ * @param executorService
+ * executor service used for processing entries
+ * @param entryStore
+ * log segment entry store
+ * @param dlsn
+ * threshold dlsn
+ * @return a future with last record.
+ */
+ public static Future<LogRecordWithDLSN> asyncReadFirstUserRecord(
+ final String streamName,
+ final LogSegmentMetadata l,
+ final int scanStartBatchSize,
+ final int scanMaxBatchSize,
+ final AtomicInteger numRecordsScanned,
+ final ExecutorService executorService,
+ final LogSegmentEntryStore entryStore,
+ final DLSN dlsn) {
+ long startEntryId = 0L;
+ if (l.getLogSegmentSequenceNumber() == dlsn.getLogSegmentSequenceNo()) {
+ startEntryId = dlsn.getEntryId();
+ }
+ final LogRecordSelector selector = new FirstDLSNNotLessThanSelector(dlsn);
+ return asyncReadRecord(streamName, l, false, false, false, scanStartBatchSize,
+ scanMaxBatchSize, numRecordsScanned, executorService, entryStore,
+ selector, false /* backward */, startEntryId);
+ }
+
+ //
+ // Private methods for scanning log segments
+ //
+
+ private static class ScanContext {
+ // variables to about current scan state
+ final AtomicInteger numEntriesToScan;
+ final AtomicLong curStartEntryId;
+ final AtomicLong curEndEntryId;
+
+ // scan settings
+ final long startEntryId;
+ final long endEntryId;
+ final int scanStartBatchSize;
+ final int scanMaxBatchSize;
+ final boolean includeControl;
+ final boolean includeEndOfStream;
+ final boolean backward;
+
+ // number of records scanned
+ final AtomicInteger numRecordsScanned;
+
+ ScanContext(long startEntryId, long endEntryId,
+ int scanStartBatchSize,
+ int scanMaxBatchSize,
+ boolean includeControl,
+ boolean includeEndOfStream,
+ boolean backward,
+ AtomicInteger numRecordsScanned) {
+ this.startEntryId = startEntryId;
+ this.endEntryId = endEntryId;
+ this.scanStartBatchSize = scanStartBatchSize;
+ this.scanMaxBatchSize = scanMaxBatchSize;
+ this.includeControl = includeControl;
+ this.includeEndOfStream = includeEndOfStream;
+ this.backward = backward;
+ // Scan state
+ this.numEntriesToScan = new AtomicInteger(scanStartBatchSize);
+ if (backward) {
+ this.curStartEntryId = new AtomicLong(
+ Math.max(startEntryId, (endEntryId - scanStartBatchSize + 1)));
+ this.curEndEntryId = new AtomicLong(endEntryId);
+ } else {
+ this.curStartEntryId = new AtomicLong(startEntryId);
+ this.curEndEntryId = new AtomicLong(
+ Math.min(endEntryId, (startEntryId + scanStartBatchSize - 1)));
+ }
+ this.numRecordsScanned = numRecordsScanned;
+ }
+
+ boolean moveToNextRange() {
+ if (backward) {
+ return moveBackward();
+ } else {
+ return moveForward();
+ }
+ }
+
+ boolean moveBackward() {
+ long nextEndEntryId = curStartEntryId.get() - 1;
+ if (nextEndEntryId < startEntryId) {
+ // no entries to read again
+ return false;
+ }
+ curEndEntryId.set(nextEndEntryId);
+ // update num entries to scan
+ numEntriesToScan.set(
+ Math.min(numEntriesToScan.get() * 2, scanMaxBatchSize));
+ // update start entry id
+ curStartEntryId.set(Math.max(startEntryId, nextEndEntryId - numEntriesToScan.get() + 1));
+ return true;
+ }
+
+ boolean moveForward() {
+ long nextStartEntryId = curEndEntryId.get() + 1;
+ if (nextStartEntryId > endEntryId) {
+ // no entries to read again
+ return false;
+ }
+ curStartEntryId.set(nextStartEntryId);
+ // update num entries to scan
+ numEntriesToScan.set(
+ Math.min(numEntriesToScan.get() * 2, scanMaxBatchSize));
+ // update start entry id
+ curEndEntryId.set(Math.min(endEntryId, nextStartEntryId + numEntriesToScan.get() - 1));
+ return true;
+ }
+ }
+
+ private static class SingleEntryScanContext extends ScanContext {
+ SingleEntryScanContext(long entryId) {
+ super(entryId, entryId, 1, 1, true, true, false, new AtomicInteger(0));
+ }
+ }
+
+ /**
+ * Read record from a given range of log segment entries.
+ *
+ * @param streamName
+ * fully qualified stream name (used for logging)
+ * @param reader
+ * log segment random access reader
+ * @param executorService
+ * executor service used for processing entries
+ * @param context
+ * scan context
+ * @return a future with the log record.
+ */
+ private static Future<LogRecordWithDLSN> asyncReadRecordFromEntries(
+ final String streamName,
+ final LogSegmentRandomAccessEntryReader reader,
+ final LogSegmentMetadata metadata,
+ final ExecutorService executorService,
+ final ScanContext context,
+ final LogRecordSelector selector) {
+ final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>();
+ final long startEntryId = context.curStartEntryId.get();
+ final long endEntryId = context.curEndEntryId.get();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} reading entries [{} - {}] from {}.",
+ new Object[] { streamName, startEntryId, endEntryId, metadata});
+ }
+ FutureEventListener<List<Entry.Reader>> readEntriesListener =
+ new FutureEventListener<List<Entry.Reader>>() {
+ @Override
+ public void onSuccess(final List<Entry.Reader> entries) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} finished reading entries [{} - {}] from {}",
+ new Object[]{ streamName, startEntryId, endEntryId, metadata});
+ }
+ for (Entry.Reader entry : entries) {
+ try {
+ visitEntryRecords(entry, context, selector);
+ } catch (IOException ioe) {
+ // exception is only thrown due to bad ledger entry, so it might be corrupted
+ // we shouldn't do anything beyond this point. throw the exception to application
+ promise.setException(ioe);
+ return;
+ }
+ }
+
+ LogRecordWithDLSN record = selector.result();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} got record from entries [{} - {}] of {} : {}",
+ new Object[]{streamName, startEntryId, endEntryId,
+ metadata, record});
+ }
+ promise.setValue(record);
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ promise.setException(cause);
+ }
+ };
+ reader.readEntries(startEntryId, endEntryId)
+ .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService));
+ return promise;
+ }
+
+ /**
+ * Process each record using LogRecordSelector.
+ *
+ * @param entry
+ * ledger entry
+ * @param context
+ * scan context
+ * @return log record with dlsn inside the ledger entry
+ * @throws IOException
+ */
+ private static void visitEntryRecords(
+ Entry.Reader entry,
+ ScanContext context,
+ LogRecordSelector selector) throws IOException {
+ LogRecordWithDLSN nextRecord = entry.nextRecord();
+ while (nextRecord != null) {
+ LogRecordWithDLSN record = nextRecord;
+ nextRecord = entry.nextRecord();
+ context.numRecordsScanned.incrementAndGet();
+ if (!context.includeControl && record.isControl()) {
+ continue;
+ }
+ if (!context.includeEndOfStream && record.isEndOfStream()) {
+ continue;
+ }
+ selector.process(record);
+ }
+ }
+
+ /**
+ * Scan entries for the given record.
+ *
+ * @param streamName
+ * fully qualified stream name (used for logging)
+ * @param reader
+ * log segment random access reader
+ * @param executorService
+ * executor service used for processing entries
+ * @param promise
+ * promise to return desired record.
+ * @param context
+ * scan context
+ */
+ private static void asyncReadRecordFromEntries(
+ final String streamName,
+ final LogSegmentRandomAccessEntryReader reader,
+ final LogSegmentMetadata metadata,
+ final ExecutorService executorService,
+ final Promise<LogRecordWithDLSN> promise,
+ final ScanContext context,
+ final LogRecordSelector selector) {
+ FutureEventListener<LogRecordWithDLSN> readEntriesListener =
+ new FutureEventListener<LogRecordWithDLSN>() {
+ @Override
+ public void onSuccess(LogRecordWithDLSN value) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} read record from [{} - {}] of {} : {}",
+ new Object[]{streamName, context.curStartEntryId.get(), context.curEndEntryId.get(),
+ metadata, value});
+ }
+ if (null != value) {
+ promise.setValue(value);
+ return;
+ }
+ if (!context.moveToNextRange()) {
+ // no entries to read again
+ promise.setValue(null);
+ return;
+ }
+ // scan next range
+ asyncReadRecordFromEntries(streamName,
+ reader,
+ metadata,
+ executorService,
+ promise,
+ context,
+ selector);
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ promise.setException(cause);
+ }
+ };
+ asyncReadRecordFromEntries(streamName, reader, metadata, executorService, context, selector)
+ .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService));
+ }
+
+ private static void asyncReadRecordFromLogSegment(
+ final String streamName,
+ final LogSegmentRandomAccessEntryReader reader,
+ final LogSegmentMetadata metadata,
+ final ExecutorService executorService,
+ final int scanStartBatchSize,
+ final int scanMaxBatchSize,
+ final boolean includeControl,
+ final boolean includeEndOfStream,
+ final Promise<LogRecordWithDLSN> promise,
+ final AtomicInteger numRecordsScanned,
+ final LogRecordSelector selector,
+ final boolean backward,
+ final long startEntryId) {
+ final long lastAddConfirmed = reader.getLastAddConfirmed();
+ if (lastAddConfirmed < 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Log segment {} is empty for {}.", new Object[] { metadata, streamName });
+ }
+ promise.setValue(null);
+ return;
+ }
+ final ScanContext context = new ScanContext(
+ startEntryId, lastAddConfirmed,
+ scanStartBatchSize, scanMaxBatchSize,
+ includeControl, includeEndOfStream, backward, numRecordsScanned);
+ asyncReadRecordFromEntries(streamName, reader, metadata, executorService,
+ promise, context, selector);
+ }
+
+ private static Future<LogRecordWithDLSN> asyncReadRecord(
+ final String streamName,
+ final LogSegmentMetadata l,
+ final boolean fence,
+ final boolean includeControl,
+ final boolean includeEndOfStream,
+ final int scanStartBatchSize,
+ final int scanMaxBatchSize,
+ final AtomicInteger numRecordsScanned,
+ final ExecutorService executorService,
+ final LogSegmentEntryStore entryStore,
+ final LogRecordSelector selector,
+ final boolean backward,
+ final long startEntryId) {
+
+ final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>();
+
+ FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener =
+ new FutureEventListener<LogSegmentRandomAccessEntryReader>() {
+ @Override
+ public void onSuccess(final LogSegmentRandomAccessEntryReader reader) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} Opened log segment {} for reading record",
+ streamName, l);
+ }
+ promise.ensure(new AbstractFunction0<BoxedUnit>() {
+ @Override
+ public BoxedUnit apply() {
+ reader.asyncClose();
+ return BoxedUnit.UNIT;
+ }
+ });
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} {} scanning {}.", new Object[]{
+ (backward ? "backward" : "forward"), streamName, l});
+ }
+ asyncReadRecordFromLogSegment(
+ streamName, reader, l, executorService,
+ scanStartBatchSize, scanMaxBatchSize,
+ includeControl, includeEndOfStream,
+ promise, numRecordsScanned, selector, backward, startEntryId);
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ promise.setException(cause);
+ }
+ };
+ entryStore.openRandomAccessReader(l, fence)
+ .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService));
+ return promise;
+ }
+
+ //
+ // Search Functions
+ //
+
+ /**
+ * Get the log record whose transaction id is not less than provided <code>transactionId</code>.
+ *
+ * <p>
+ * It uses a binary-search like algorithm to find the log record whose transaction id is not less than
+ * provided <code>transactionId</code> within a log <code>segment</code>. You could think of a log segment
+ * in terms of a sequence of records whose transaction ids are non-decreasing.
+ *
+ * - The sequence of records within a log segment is divided into N pieces.
+ * - Find the piece of records that contains a record whose transaction id is not less than provided
+ * <code>transactionId</code>.
+ *
+ * N could be chosen based on trading off concurrency and latency.
+ * </p>
+ *
+ * @param logName
+ * name of the log
+ * @param segment
+ * metadata of the log segment
+ * @param transactionId
+ * transaction id
+ * @param executorService
+ * executor service used for processing entries
+ * @param entryStore
+ * log segment entry store
+ * @param nWays
+ * how many number of entries to search in parallel
+ * @return found log record. none if all transaction ids are less than provided <code>transactionId</code>.
+ */
+ public static Future<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId(
+ final String logName,
+ final LogSegmentMetadata segment,
+ final long transactionId,
+ final ExecutorService executorService,
+ final LogSegmentEntryStore entryStore,
+ final int nWays) {
+ if (!segment.isInProgress()) {
+ if (segment.getLastTxId() < transactionId) {
+ // all log records whose transaction id is less than provided transactionId
+ // then return none
+ Optional<LogRecordWithDLSN> noneRecord = Optional.absent();
+ return Future.value(noneRecord);
+ }
+ }
+
+ final Promise<Optional<LogRecordWithDLSN>> promise =
+ new Promise<Optional<LogRecordWithDLSN>>();
+ final FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener =
+ new FutureEventListener<LogSegmentRandomAccessEntryReader>() {
+ @Override
+ public void onSuccess(final LogSegmentRandomAccessEntryReader reader) {
+ promise.ensure(new AbstractFunction0<BoxedUnit>() {
+ @Override
+ public BoxedUnit apply() {
+ reader.asyncClose();
+ return BoxedUnit.UNIT;
+ }
+
+ });
+ long lastEntryId = reader.getLastAddConfirmed();
+ if (lastEntryId < 0) {
+ // it means that the log segment is created but not written yet or an empty log segment.
+ // it is equivalent to 'all log records whose transaction id is less than provided transactionId'
+ Optional<LogRecordWithDLSN> nonRecord = Optional.absent();
+ promise.setValue(nonRecord);
+ return;
+ }
+ // all log records whose transaction id is not less than provided transactionId
+ if (segment.getFirstTxId() >= transactionId) {
+ final FirstTxIdNotLessThanSelector selector =
+ new FirstTxIdNotLessThanSelector(transactionId);
+ asyncReadRecordFromEntries(
+ logName,
+ reader,
+ segment,
+ executorService,
+ new SingleEntryScanContext(0L),
+ selector
+ ).addEventListener(new FutureEventListener<LogRecordWithDLSN>() {
+ @Override
+ public void onSuccess(LogRecordWithDLSN value) {
+ promise.setValue(Optional.of(selector.result()));
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ promise.setException(cause);
+ }
+ });
+
+ return;
+ }
+ getLogRecordNotLessThanTxIdFromEntries(
+ logName,
+ segment,
+ transactionId,
+ executorService,
+ reader,
+ Lists.newArrayList(0L, lastEntryId),
+ nWays,
+ Optional.<LogRecordWithDLSN>absent(),
+ promise);
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ promise.setException(cause);
+ }
+ };
+
+ entryStore.openRandomAccessReader(segment, false)
+ .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService));
+ return promise;
+ }
+
+ /**
+ * Find the log record whose transaction id is not less than provided <code>transactionId</code> from
+ * entries between <code>startEntryId</code> and <code>endEntryId</code>.
+ *
+ * @param logName
+ * name of the log
+ * @param segment
+ * log segment
+ * @param transactionId
+ * provided transaction id to search
+ * @param executorService
+ * executor service
+ * @param reader
+ * log segment random access reader
+ * @param entriesToSearch
+ * list of entries to search
+ * @param nWays
+ * how many entries to search in parallel
+ * @param prevFoundRecord
+ * the log record found in previous search
+ * @param promise
+ * promise to satisfy the result
+ */
+ private static void getLogRecordNotLessThanTxIdFromEntries(
+ final String logName,
+ final LogSegmentMetadata segment,
+ final long transactionId,
+ final ExecutorService executorService,
+ final LogSegmentRandomAccessEntryReader reader,
+ final List<Long> entriesToSearch,
+ final int nWays,
+ final Optional<LogRecordWithDLSN> prevFoundRecord,
+ final Promise<Optional<LogRecordWithDLSN>> promise) {
+ final List<Future<LogRecordWithDLSN>> searchResults =
+ Lists.newArrayListWithExpectedSize(entriesToSearch.size());
+ for (Long entryId : entriesToSearch) {
+ LogRecordSelector selector = new FirstTxIdNotLessThanSelector(transactionId);
+ Future<LogRecordWithDLSN> searchResult = asyncReadRecordFromEntries(
+ logName,
+ reader,
+ segment,
+ executorService,
+ new SingleEntryScanContext(entryId),
+ selector);
+ searchResults.add(searchResult);
+ }
+ FutureEventListener<List<LogRecordWithDLSN>> processSearchResultsListener =
+ new FutureEventListener<List<LogRecordWithDLSN>>() {
+ @Override
+ public void onSuccess(List<LogRecordWithDLSN> resultList) {
+ processSearchResults(
+ logName,
+ segment,
+ transactionId,
+ executorService,
+ reader,
+ resultList,
+ nWays,
+ prevFoundRecord,
+ promise);
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ promise.setException(cause);
+ }
+ };
+ Future.collect(searchResults).addEventListener(
+ FutureEventListenerRunnable.of(processSearchResultsListener, executorService));
+ }
+
+ /**
+ * Process the search results
+ */
+ static void processSearchResults(
+ final String logName,
+ final LogSegmentMetadata segment,
+ final long transactionId,
+ final ExecutorService executorService,
+ final LogSegmentRandomAccessEntryReader reader,
+ final List<LogRecordWithDLSN> searchResults,
+ final int nWays,
+ final Optional<LogRecordWithDLSN> prevFoundRecord,
+ final Promise<Optional<LogRecordWithDLSN>> promise) {
+ int found = -1;
+ for (int i = 0; i < searchResults.size(); i++) {
+ LogRecordWithDLSN record = searchResults.get(i);
+ if (record.getTransactionId() >= transactionId) {
+ found = i;
+ break;
+ }
+ }
+ if (found == -1) { // all log records' transaction id is less than provided transaction id
+ promise.setValue(prevFoundRecord);
+ return;
+ }
+ // we found a log record
+ LogRecordWithDLSN foundRecord = searchResults.get(found);
+
+ // we found it
+ // - it is not the first record
+ // - it is the first record in first search entry
+ // - its entry is adjacent to previous search entry
+ if (foundRecord.getDlsn().getSlotId() != 0L
+ || found == 0
+ || foundRecord.getDlsn().getEntryId() == (searchResults.get(found - 1).getDlsn().getEntryId() + 1)) {
+ promise.setValue(Optional.of(foundRecord));
+ return;
+ }
+
+ // otherwise, we need to search
+ List<Long> nextSearchBatch = getEntriesToSearch(
+ transactionId,
+ searchResults.get(found - 1),
+ searchResults.get(found),
+ nWays);
+ if (nextSearchBatch.isEmpty()) {
+ promise.setValue(prevFoundRecord);
+ return;
+ }
+ getLogRecordNotLessThanTxIdFromEntries(
+ logName,
+ segment,
+ transactionId,
+ executorService,
+ reader,
+ nextSearchBatch,
+ nWays,
+ Optional.of(foundRecord),
+ promise);
+ }
+
+ /**
+ * Get the entries to search provided <code>transactionId</code> between
+ * <code>firstRecord</code> and <code>lastRecord</code>. <code>firstRecord</code>
+ * and <code>lastRecord</code> are already searched, which the transaction id
+ * of <code>firstRecord</code> is less than <code>transactionId</code> and the
+ * transaction id of <code>lastRecord</code> is not less than <code>transactionId</code>.
+ *
+ * @param transactionId
+ * transaction id to search
+ * @param firstRecord
+ * log record that already searched whose transaction id is leass than <code>transactionId</code>.
+ * @param lastRecord
+ * log record that already searched whose transaction id is not less than <code>transactionId</code>.
+ * @param nWays
+ * N-ways to search
+ * @return the list of entries to search
+ */
+ static List<Long> getEntriesToSearch(
+ long transactionId,
+ LogRecordWithDLSN firstRecord,
+ LogRecordWithDLSN lastRecord,
+ int nWays) {
+ long txnDiff = lastRecord.getTransactionId() - firstRecord.getTransactionId();
+ if (txnDiff > 0) {
+ if (lastRecord.getTransactionId() == transactionId) {
+ List<Long> entries = getEntriesToSearch(
+ firstRecord.getDlsn().getEntryId() + 1,
+ lastRecord.getDlsn().getEntryId() - 2,
+ Math.max(MIN_SEARCH_BATCH_SIZE, nWays - 1));
+ entries.add(lastRecord.getDlsn().getEntryId() - 1);
+ return entries;
+ } else {
+ // TODO: improve it by estimating transaction ids.
+ return getEntriesToSearch(
+ firstRecord.getDlsn().getEntryId() + 1,
+ lastRecord.getDlsn().getEntryId() - 1,
+ nWays);
+ }
+ } else {
+ // unexpected condition
+ return Lists.newArrayList();
+ }
+ }
+
+ static List<Long> getEntriesToSearch(
+ long startEntryId,
+ long endEntryId,
+ int nWays) {
+ if (startEntryId > endEntryId) {
+ return Lists.newArrayList();
+ }
+ long numEntries = endEntryId - startEntryId + 1;
+ long step = Math.max(1L, numEntries / nWays);
+ List<Long> entryList = Lists.newArrayListWithExpectedSize(nWays);
+ for (long i = startEntryId, j = nWays - 1; i <= endEntryId && j > 0; i += step, j--) {
+ entryList.add(i);
+ }
+ if (entryList.get(entryList.size() - 1) < endEntryId) {
+ entryList.add(endEntryId);
+ }
+ return entryList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java
new file mode 100644
index 0000000..d25d056
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.util.PermitLimiter;
+
+public class WriteLimiter {
+
+ String streamName;
+ final PermitLimiter streamLimiter;
+ final PermitLimiter globalLimiter;
+
+ public WriteLimiter(String streamName, PermitLimiter streamLimiter, PermitLimiter globalLimiter) {
+ this.streamName = streamName;
+ this.streamLimiter = streamLimiter;
+ this.globalLimiter = globalLimiter;
+ }
+
+ public void acquire() throws OverCapacityException {
+ if (!streamLimiter.acquire()) {
+ throw new OverCapacityException(String.format("Stream write capacity exceeded for stream %s", streamName));
+ }
+ try {
+ if (!globalLimiter.acquire()) {
+ throw new OverCapacityException("Global write capacity exceeded");
+ }
+ } catch (OverCapacityException ex) {
+ streamLimiter.release(1);
+ throw ex;
+ }
+ }
+
+ public void release() {
+ release(1);
+ }
+
+ public void release(int permits) {
+ streamLimiter.release(permits);
+ globalLimiter.release(permits);
+ }
+
+ public void close() {
+ streamLimiter.close();
+ globalLimiter.close();
+ }
+}