You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:31 UTC
[17/31] incubator-distributedlog git commit: DL-160: Remove 'DLSN'
suffix from async and sync readers
DL-160: Remove 'DLSN' suffix from async and sync readers
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/c7751804
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/c7751804
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/c7751804
Branch: refs/heads/master
Commit: c7751804ef728c39d54b19e52f2db48d7fba9f65
Parents: 7a97797
Author: Sijie Guo <si...@twitter.com>
Authored: Wed Dec 28 16:27:38 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Thu Dec 29 02:12:04 2016 -0800
----------------------------------------------------------------------
.../distributedlog/BKAsyncLogReader.java | 748 +++++++++++++++++++
.../distributedlog/BKAsyncLogReaderDLSN.java | 748 -------------------
.../distributedlog/BKDistributedLogManager.java | 8 +-
.../twitter/distributedlog/BKSyncLogReader.java | 276 +++++++
.../distributedlog/BKSyncLogReaderDLSN.java | 276 -------
.../src/main/resources/findbugsExclude.xml | 2 +-
.../NonBlockingReadsTestUtil.java | 6 +-
.../distributedlog/TestAsyncReaderLock.java | 6 +-
.../distributedlog/TestAsyncReaderWriter.java | 14 +-
.../distributedlog/TestBKSyncLogReader.java | 6 +-
.../TestNonBlockingReadsMultiReader.java | 2 +-
.../com/twitter/distributedlog/TestReader.java | 2 +-
.../distributedlog/TestRollLogSegments.java | 4 +-
13 files changed, 1049 insertions(+), 1049 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
new file mode 100644
index 0000000..18d2e15
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
@@ -0,0 +1,748 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.twitter.distributedlog.exceptions.DLIllegalStateException;
+import com.twitter.distributedlog.exceptions.DLInterruptedException;
+import com.twitter.distributedlog.exceptions.EndOfStreamException;
+import com.twitter.distributedlog.exceptions.IdleReaderException;
+import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.exceptions.ReadCancelledException;
+import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.util.Utils;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import com.twitter.util.Throw;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Function1;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+/**
+ * BookKeeper based {@link AsyncLogReader} implementation.
+ *
+ * <h3>Metrics</h3>
+ * All the metrics are exposed under `async_reader`.
+ * <ul>
+ * <li> `async_reader`/future_set: opstats. time spent on satisfying futures of read requests.
+ * if it is high, it means that the caller takes time on processing the result of read requests.
+ * The side effect is blocking consequent reads.
+ * <li> `async_reader`/schedule: opstats. time spent on scheduling next reads.
+ * <li> `async_reader`/background_read: opstats. time spent on background reads.
+ * <li> `async_reader`/read_next_exec: opstats. time spent on executing {@link #readNext()}.
+ * <li> `async_reader`/time_between_read_next: opstats. time spent on between two consequent {@link #readNext()}.
+ * if it is high, it means that the caller is slowing down on calling {@link #readNext()}.
+ * <li> `async_reader`/delay_until_promise_satisfied: opstats. total latency for the read requests.
+ * <li> `async_reader`/idle_reader_error: counter. the number idle reader errors.
+ * </ul>
+ */
+class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification {
+ static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReader.class);
+
+ private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION =
+ new AbstractFunction1<List<LogRecordWithDLSN>, LogRecordWithDLSN>() {
+ @Override
+ public LogRecordWithDLSN apply(List<LogRecordWithDLSN> records) {
+ return records.get(0);
+ }
+ };
+
+ protected final BKDistributedLogManager bkDistributedLogManager;
+ protected final BKLogReadHandler readHandler;
+ private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
+ private final ScheduledExecutorService executorService;
+ private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>();
+ private final Object scheduleLock = new Object();
+ private final AtomicLong scheduleCount = new AtomicLong(0);
+ final private Stopwatch scheduleDelayStopwatch;
+ final private Stopwatch readNextDelayStopwatch;
+ private DLSN startDLSN;
+ private ReadAheadEntryReader readAheadReader = null;
+ private int lastPosition = 0;
+ private final boolean positionGapDetectionEnabled;
+ private final int idleErrorThresholdMillis;
+ final ScheduledFuture<?> idleReaderTimeoutTask;
+ private ScheduledFuture<?> backgroundScheduleTask = null;
+ // last process time
+ private final Stopwatch lastProcessTime;
+
+ protected Promise<Void> closeFuture = null;
+
+ private boolean lockStream = false;
+
+ private final boolean returnEndOfStreamRecord;
+
+ private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() {
+ @Override
+ public void run() {
+ synchronized (scheduleLock) {
+ backgroundScheduleTask = null;
+ }
+ scheduleBackgroundRead();
+ }
+ };
+
+ // State
+ private Entry.Reader currentEntry = null;
+ private LogRecordWithDLSN nextRecord = null;
+
+ // Failure Injector
+ private boolean disableProcessingReadRequests = false;
+
+ // Stats
+ private final OpStatsLogger readNextExecTime;
+ private final OpStatsLogger delayUntilPromiseSatisfied;
+ private final OpStatsLogger timeBetweenReadNexts;
+ private final OpStatsLogger futureSetLatency;
+ private final OpStatsLogger scheduleLatency;
+ private final OpStatsLogger backgroundReaderRunTime;
+ private final Counter idleReaderCheckCount;
+ private final Counter idleReaderCheckIdleReadRequestCount;
+ private final Counter idleReaderCheckIdleReadAheadCount;
+ private final Counter idleReaderError;
+
+ private class PendingReadRequest {
+ private final Stopwatch enqueueTime;
+ private final int numEntries;
+ private final List<LogRecordWithDLSN> records;
+ private final Promise<List<LogRecordWithDLSN>> promise;
+ private final long deadlineTime;
+ private final TimeUnit deadlineTimeUnit;
+
+ PendingReadRequest(int numEntries,
+ long deadlineTime,
+ TimeUnit deadlineTimeUnit) {
+ this.numEntries = numEntries;
+ this.enqueueTime = Stopwatch.createStarted();
+ // optimize the space usage for single read.
+ if (numEntries == 1) {
+ this.records = new ArrayList<LogRecordWithDLSN>(1);
+ } else {
+ this.records = new ArrayList<LogRecordWithDLSN>();
+ }
+ this.promise = new Promise<List<LogRecordWithDLSN>>();
+ this.deadlineTime = deadlineTime;
+ this.deadlineTimeUnit = deadlineTimeUnit;
+ }
+
+ Promise<List<LogRecordWithDLSN>> getPromise() {
+ return promise;
+ }
+
+ long elapsedSinceEnqueue(TimeUnit timeUnit) {
+ return enqueueTime.elapsed(timeUnit);
+ }
+
+ void setException(Throwable throwable) {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ if (promise.updateIfEmpty(new Throw<List<LogRecordWithDLSN>>(throwable))) {
+ futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS));
+ }
+ }
+
+ boolean hasReadRecords() {
+ return records.size() > 0;
+ }
+
+ boolean hasReadEnoughRecords() {
+ return records.size() >= numEntries;
+ }
+
+ long getRemainingWaitTime() {
+ if (deadlineTime <= 0L) {
+ return 0L;
+ }
+ return deadlineTime - elapsedSinceEnqueue(deadlineTimeUnit);
+ }
+
+ void addRecord(LogRecordWithDLSN record) {
+ records.add(record);
+ }
+
+ void complete() {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} : Satisfied promise with {} records", readHandler.getFullyQualifiedName(), records.size());
+ }
+ delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS));
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ promise.setValue(records);
+ futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ }
+ }
+
+ BKAsyncLogReader(BKDistributedLogManager bkdlm,
+ ScheduledExecutorService executorService,
+ DLSN startDLSN,
+ Optional<String> subscriberId,
+ boolean returnEndOfStreamRecord,
+ StatsLogger statsLogger) {
+ this.bkDistributedLogManager = bkdlm;
+ this.executorService = executorService;
+ this.readHandler = bkDistributedLogManager.createReadHandler(subscriberId,
+ this, true);
+ LOG.debug("Starting async reader at {}", startDLSN);
+ this.startDLSN = startDLSN;
+ this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
+ this.readNextDelayStopwatch = Stopwatch.createStarted();
+ this.positionGapDetectionEnabled = bkdlm.getConf().getPositionGapDetectionEnabled();
+ this.idleErrorThresholdMillis = bkdlm.getConf().getReaderIdleErrorThresholdMillis();
+ this.returnEndOfStreamRecord = returnEndOfStreamRecord;
+
+ // Stats
+ StatsLogger asyncReaderStatsLogger = statsLogger.scope("async_reader");
+ futureSetLatency = asyncReaderStatsLogger.getOpStatsLogger("future_set");
+ scheduleLatency = asyncReaderStatsLogger.getOpStatsLogger("schedule");
+ backgroundReaderRunTime = asyncReaderStatsLogger.getOpStatsLogger("background_read");
+ readNextExecTime = asyncReaderStatsLogger.getOpStatsLogger("read_next_exec");
+ timeBetweenReadNexts = asyncReaderStatsLogger.getOpStatsLogger("time_between_read_next");
+ delayUntilPromiseSatisfied = asyncReaderStatsLogger.getOpStatsLogger("delay_until_promise_satisfied");
+ idleReaderError = asyncReaderStatsLogger.getCounter("idle_reader_error");
+ idleReaderCheckCount = asyncReaderStatsLogger.getCounter("idle_reader_check_total");
+ idleReaderCheckIdleReadRequestCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_read_requests");
+ idleReaderCheckIdleReadAheadCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_readahead");
+
+ // Lock the stream if requested. The lock will be released when the reader is closed.
+ this.lockStream = false;
+ this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary();
+ this.lastProcessTime = Stopwatch.createStarted();
+ }
+
+ private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
+ if (idleErrorThresholdMillis < Integer.MAX_VALUE) {
+ // Dont run the task more than once every seconds (for sanity)
+ long period = Math.max(idleErrorThresholdMillis / 10, 1000);
+ // Except when idle reader threshold is less than a second (tests?)
+ period = Math.min(period, idleErrorThresholdMillis / 5);
+
+ return executorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ PendingReadRequest nextRequest = pendingRequests.peek();
+
+ idleReaderCheckCount.inc();
+ if (null == nextRequest) {
+ return;
+ }
+
+ idleReaderCheckIdleReadRequestCount.inc();
+ if (nextRequest.elapsedSinceEnqueue(TimeUnit.MILLISECONDS) < idleErrorThresholdMillis) {
+ return;
+ }
+
+ ReadAheadEntryReader readAheadReader = getReadAheadReader();
+
+ // read request has been idle
+ // - cache has records but read request are idle,
+ // that means notification was missed between readahead and reader.
+ // - cache is empty and readahead is idle (no records added for a long time)
+ idleReaderCheckIdleReadAheadCount.inc();
+ try {
+ if (null == readAheadReader || (!hasMoreRecords() &&
+ readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS))) {
+ markReaderAsIdle();
+ return;
+ } else if (lastProcessTime.elapsed(TimeUnit.MILLISECONDS) > idleErrorThresholdMillis) {
+ markReaderAsIdle();;
+ }
+ } catch (IOException e) {
+ setLastException(e);
+ return;
+ }
+ }
+ }, period, period, TimeUnit.MILLISECONDS);
+ }
+ return null;
+ }
+
+ synchronized ReadAheadEntryReader getReadAheadReader() {
+ return readAheadReader;
+ }
+
+ void cancelIdleReaderTask() {
+ // Do this after we have checked that the reader was not previously closed
+ try {
+ if (null != idleReaderTimeoutTask) {
+ idleReaderTimeoutTask.cancel(true);
+ }
+ } catch (Exception exc) {
+ LOG.info("{}: Failed to cancel the background idle reader timeout task", readHandler.getFullyQualifiedName());
+ }
+ }
+
+ private void markReaderAsIdle() {
+ idleReaderError.inc();
+ IdleReaderException ire = new IdleReaderException("Reader on stream "
+ + readHandler.getFullyQualifiedName()
+ + " is idle for " + idleErrorThresholdMillis +" ms");
+ setLastException(ire);
+ // cancel all pending reads directly rather than notifying on error
+ // because idle reader could happen on idle read requests that usually means something wrong
+ // in scheduling reads
+ cancelAllPendingReads(ire);
+ }
+
+ protected synchronized void setStartDLSN(DLSN fromDLSN) throws UnexpectedException {
+ if (null != readAheadReader) {
+ throw new UnexpectedException("Could't reset from dlsn after reader already starts reading.");
+ }
+ startDLSN = fromDLSN;
+ }
+
+ @VisibleForTesting
+ public synchronized DLSN getStartDLSN() {
+ return startDLSN;
+ }
+
+ public Future<Void> lockStream() {
+ this.lockStream = true;
+ return readHandler.lockStream();
+ }
+
+ private boolean checkClosedOrInError(String operation) {
+ if (null == lastException.get()) {
+ try {
+ if (null != readHandler && null != getReadAheadReader()) {
+ getReadAheadReader().checkLastException();
+ }
+
+ bkDistributedLogManager.checkClosedOrInError(operation);
+ } catch (IOException exc) {
+ setLastException(exc);
+ }
+ }
+
+ if (lockStream) {
+ try {
+ readHandler.checkReadLock();
+ } catch (IOException ex) {
+ setLastException(ex);
+ }
+ }
+
+ if (null != lastException.get()) {
+ LOG.trace("Cancelling pending reads");
+ cancelAllPendingReads(lastException.get());
+ return true;
+ }
+
+ return false;
+ }
+
+ private void setLastException(IOException exc) {
+ lastException.compareAndSet(null, exc);
+ }
+
+ @Override
+ public String getStreamName() {
+ return bkDistributedLogManager.getStreamName();
+ }
+
+ /**
+ * @return A promise that when satisfied will contain the Log Record with its DLSN.
+ */
+ @Override
+ public synchronized Future<LogRecordWithDLSN> readNext() {
+ return readInternal(1, 0, TimeUnit.MILLISECONDS).map(READ_NEXT_MAP_FUNCTION);
+ }
+
+ public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries) {
+ return readInternal(numEntries, 0, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries,
+ long waitTime,
+ TimeUnit timeUnit) {
+ return readInternal(numEntries, waitTime, timeUnit);
+ }
+
+ /**
+ * Read up to <i>numEntries</i> entries. The future will be satisfied when any number of entries are
+ * ready (1 to <i>numEntries</i>).
+ *
+ * @param numEntries
+ * num entries to read
+ * @return A promise that satisfied with a non-empty list of log records with their DLSN.
+ */
+ private synchronized Future<List<LogRecordWithDLSN>> readInternal(int numEntries,
+ long deadlineTime,
+ TimeUnit deadlineTimeUnit) {
+ timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
+ readNextDelayStopwatch.reset().start();
+ final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit);
+
+ if (null == readAheadReader) {
+ try {
+ final ReadAheadEntryReader readAheadEntryReader = this.readAheadReader = new ReadAheadEntryReader(
+ getStreamName(),
+ getStartDLSN(),
+ bkDistributedLogManager.getConf(),
+ readHandler,
+ bkDistributedLogManager.getReaderEntryStore(),
+ bkDistributedLogManager.getScheduler(),
+ Ticker.systemTicker(),
+ bkDistributedLogManager.alertStatsLogger);
+ readHandler.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() {
+ @Override
+ public void onSuccess(Void value) {
+ try {
+ readHandler.registerListener(readAheadEntryReader);
+ readHandler.asyncStartFetchLogSegments()
+ .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
+ readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this);
+ readAheadEntryReader.start(logSegments.getValue());
+ return BoxedUnit.UNIT;
+ }
+ });
+ } catch (Exception exc) {
+ notifyOnError(exc);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ notifyOnError(cause);
+ }
+ });
+ } catch (IOException ioe) {
+ notifyOnError(ioe);
+ }
+ }
+
+ if (checkClosedOrInError("readNext")) {
+ readRequest.setException(lastException.get());
+ } else {
+ boolean queueEmpty = pendingRequests.isEmpty();
+ pendingRequests.add(readRequest);
+
+ if (queueEmpty) {
+ scheduleBackgroundRead();
+ }
+ }
+
+ readNextExecTime.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
+ readNextDelayStopwatch.reset().start();
+
+ return readRequest.getPromise();
+ }
+
+ public synchronized void scheduleBackgroundRead() {
+ // if the reader is already closed, we don't need to schedule background read again.
+ if (null != closeFuture) {
+ return;
+ }
+
+ long prevCount = scheduleCount.getAndIncrement();
+ if (0 == prevCount) {
+ scheduleDelayStopwatch.reset().start();
+ executorService.submit(this);
+ }
+ }
+
+ @Override
+ public Future<Void> asyncClose() {
+ // Cancel the idle reader timeout task, interrupting if necessary
+ ReadCancelledException exception;
+ Promise<Void> closePromise;
+ synchronized (this) {
+ if (null != closeFuture) {
+ return closeFuture;
+ }
+ closePromise = closeFuture = new Promise<Void>();
+ exception = new ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was closed");
+ setLastException(exception);
+ }
+
+ // Do this after we have checked that the reader was not previously closed
+ cancelIdleReaderTask();
+
+ synchronized (scheduleLock) {
+ if (null != backgroundScheduleTask) {
+ backgroundScheduleTask.cancel(true);
+ }
+ }
+
+ cancelAllPendingReads(exception);
+
+ ReadAheadEntryReader readAheadReader = getReadAheadReader();
+ if (null != readAheadReader) {
+ readHandler.unregisterListener(readAheadReader);
+ readAheadReader.removeStateChangeNotification(this);
+ }
+ Utils.closeSequence(bkDistributedLogManager.getScheduler(), true,
+ readAheadReader,
+ readHandler
+ ).proxyTo(closePromise);
+ return closePromise;
+ }
+
+ private void cancelAllPendingReads(Throwable throwExc) {
+ for (PendingReadRequest promise : pendingRequests) {
+ promise.setException(throwExc);
+ }
+ pendingRequests.clear();
+ }
+
+ synchronized boolean hasMoreRecords() throws IOException {
+ if (null == readAheadReader) {
+ return false;
+ }
+ if (readAheadReader.getNumCachedEntries() > 0 || null != nextRecord) {
+ return true;
+ } else if (null != currentEntry) {
+ nextRecord = currentEntry.nextRecord();
+ return null != nextRecord;
+ }
+ return false;
+ }
+
+ private synchronized LogRecordWithDLSN readNextRecord() throws IOException {
+ if (null == readAheadReader) {
+ return null;
+ }
+ if (null == currentEntry) {
+ currentEntry = readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
+ // no entry after reading from read ahead then return null
+ if (null == currentEntry) {
+ return null;
+ }
+ }
+
+ LogRecordWithDLSN recordToReturn;
+ if (null == nextRecord) {
+ nextRecord = currentEntry.nextRecord();
+ // no more records in current entry
+ if (null == nextRecord) {
+ currentEntry = null;
+ return readNextRecord();
+ }
+ }
+
+ // found a record to return and prefetch the next one
+ recordToReturn = nextRecord;
+ nextRecord = currentEntry.nextRecord();
+ return recordToReturn;
+ }
+
+ @Override
+ public void run() {
+ synchronized(scheduleLock) {
+ if (scheduleDelayStopwatch.isRunning()) {
+ scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ }
+
+ Stopwatch runTime = Stopwatch.createStarted();
+ int iterations = 0;
+ long scheduleCountLocal = scheduleCount.get();
+ LOG.debug("{}: Scheduled Background Reader", readHandler.getFullyQualifiedName());
+ while(true) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{}: Executing Iteration: {}", readHandler.getFullyQualifiedName(), iterations++);
+ }
+
+ PendingReadRequest nextRequest = null;
+ synchronized(this) {
+ nextRequest = pendingRequests.peek();
+
+ // Queue is empty, nothing to read, return
+ if (null == nextRequest) {
+ LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName());
+ scheduleCount.set(0);
+ backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+ return;
+ }
+
+ if (disableProcessingReadRequests) {
+ LOG.info("Reader of {} is forced to stop processing read requests", readHandler.getFullyQualifiedName());
+ return;
+ }
+ }
+ lastProcessTime.reset().start();
+
+ // If the oldest pending promise is interrupted then we must mark
+ // the reader in error and abort all pending reads since we dont
+ // know the last consumed read
+ if (null == lastException.get()) {
+ if (nextRequest.getPromise().isInterrupted().isDefined()) {
+ setLastException(new DLInterruptedException("Interrupted on reading " + readHandler.getFullyQualifiedName() + " : ",
+ nextRequest.getPromise().isInterrupted().get()));
+ }
+ }
+
+ if (checkClosedOrInError("readNext")) {
+ if (!(lastException.get().getCause() instanceof LogNotFoundException)) {
+ LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get());
+ }
+ backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+ return;
+ }
+
+ try {
+ // Fail 10% of the requests when asked to simulate errors
+ if (bkDistributedLogManager.getFailureInjector().shouldInjectErrors()) {
+ throw new IOException("Reader Simulated Exception");
+ }
+ LogRecordWithDLSN record;
+ while (!nextRequest.hasReadEnoughRecords()) {
+ // read single record
+ do {
+ record = readNextRecord();
+ } while (null != record && (record.isControl() || (record.getDlsn().compareTo(getStartDLSN()) < 0)));
+ if (null == record) {
+ break;
+ } else {
+ if (record.isEndOfStream() && !returnEndOfStreamRecord) {
+ setLastException(new EndOfStreamException("End of Stream Reached for "
+ + readHandler.getFullyQualifiedName()));
+ break;
+ }
+
+ // gap detection
+ if (recordPositionsContainsGap(record, lastPosition)) {
+ bkDistributedLogManager.raiseAlert("Gap detected between records at record = {}", record);
+ if (positionGapDetectionEnabled) {
+ throw new DLIllegalStateException("Gap detected between records at record = " + record);
+ }
+ }
+ lastPosition = record.getLastPositionWithinLogSegment();
+
+ nextRequest.addRecord(record);
+ }
+ };
+ } catch (IOException exc) {
+ setLastException(exc);
+ if (!(exc instanceof LogNotFoundException)) {
+ LOG.warn("{} : read with skip Exception", readHandler.getFullyQualifiedName(), lastException.get());
+ }
+ continue;
+ }
+
+ if (nextRequest.hasReadRecords()) {
+ long remainingWaitTime = nextRequest.getRemainingWaitTime();
+ if (remainingWaitTime > 0 && !nextRequest.hasReadEnoughRecords()) {
+ backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+ scheduleDelayStopwatch.reset().start();
+ scheduleCount.set(0);
+ // the request could still wait for more records
+ backgroundScheduleTask = executorService.schedule(BACKGROUND_READ_SCHEDULER, remainingWaitTime, nextRequest.deadlineTimeUnit);
+ return;
+ }
+
+ PendingReadRequest request = pendingRequests.poll();
+ if (null != request && nextRequest == request) {
+ request.complete();
+ if (null != backgroundScheduleTask) {
+ backgroundScheduleTask.cancel(true);
+ backgroundScheduleTask = null;
+ }
+ } else {
+ DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at dlsn = "
+ + nextRequest.records.get(0).getDlsn());
+ nextRequest.setException(ise);
+ if (null != request) {
+ request.setException(ise);
+ }
+ // We should never get here as we should have exited the loop if
+ // pendingRequests were empty
+ bkDistributedLogManager.raiseAlert("Unexpected condition at dlsn = {}",
+ nextRequest.records.get(0).getDlsn());
+ setLastException(ise);
+ }
+ } else {
+ if (0 == scheduleCountLocal) {
+ LOG.trace("Schedule count dropping to zero", lastException.get());
+ backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+ return;
+ }
+ scheduleCountLocal = scheduleCount.decrementAndGet();
+ }
+ }
+ }
+ }
+
+ private boolean recordPositionsContainsGap(LogRecordWithDLSN record, long lastPosition) {
+ final boolean firstLogRecord = (1 == record.getPositionWithinLogSegment());
+ final boolean endOfStreamRecord = record.isEndOfStream();
+ final boolean emptyLogSegment = (0 == lastPosition);
+ final boolean positionIncreasedByOne = (record.getPositionWithinLogSegment() == (lastPosition + 1));
+
+ return !firstLogRecord && !endOfStreamRecord && !emptyLogSegment &&
+ !positionIncreasedByOne;
+ }
+
+ /**
+ * Triggered when the background activity encounters an exception
+ */
+ @Override
+ public void notifyOnError(Throwable cause) {
+ if (cause instanceof IOException) {
+ setLastException((IOException) cause);
+ } else {
+ setLastException(new IOException(cause));
+ }
+ scheduleBackgroundRead();
+ }
+
+ /**
+ * Triggered when the background activity completes an operation
+ */
+ @Override
+ public void notifyOnOperationComplete() {
+ scheduleBackgroundRead();
+ }
+
+ @VisibleForTesting
+ void simulateErrors() {
+ bkDistributedLogManager.getFailureInjector().injectErrors(true);
+ }
+
+ @VisibleForTesting
+ synchronized void disableReadAheadLogSegmentsNotification() {
+ readHandler.disableReadAheadLogSegmentsNotification();
+ }
+
+ @VisibleForTesting
+ synchronized void disableProcessingReadRequests() {
+ disableProcessingReadRequests = true;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
deleted file mode 100644
index e347012..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ /dev/null
@@ -1,748 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Ticker;
-import com.twitter.distributedlog.exceptions.DLIllegalStateException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.EndOfStreamException;
-import com.twitter.distributedlog.exceptions.IdleReaderException;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.ReadCancelledException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Throw;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Function1;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-/**
- * BookKeeper based {@link AsyncLogReader} implementation.
- *
- * <h3>Metrics</h3>
- * All the metrics are exposed under `async_reader`.
- * <ul>
- * <li> `async_reader`/future_set: opstats. time spent on satisfying futures of read requests.
- * if it is high, it means that the caller takes time on processing the result of read requests.
- * The side effect is blocking consequent reads.
- * <li> `async_reader`/schedule: opstats. time spent on scheduling next reads.
- * <li> `async_reader`/background_read: opstats. time spent on background reads.
- * <li> `async_reader`/read_next_exec: opstats. time spent on executing {@link #readNext()}.
- * <li> `async_reader`/time_between_read_next: opstats. time spent on between two consequent {@link #readNext()}.
- * if it is high, it means that the caller is slowing down on calling {@link #readNext()}.
- * <li> `async_reader`/delay_until_promise_satisfied: opstats. total latency for the read requests.
- * <li> `async_reader`/idle_reader_error: counter. the number idle reader errors.
- * </ul>
- */
-class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotification {
- static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReaderDLSN.class);
-
- private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION =
- new AbstractFunction1<List<LogRecordWithDLSN>, LogRecordWithDLSN>() {
- @Override
- public LogRecordWithDLSN apply(List<LogRecordWithDLSN> records) {
- return records.get(0);
- }
- };
-
- protected final BKDistributedLogManager bkDistributedLogManager;
- protected final BKLogReadHandler readHandler;
- private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
- private final ScheduledExecutorService executorService;
- private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>();
- private final Object scheduleLock = new Object();
- private final AtomicLong scheduleCount = new AtomicLong(0);
- final private Stopwatch scheduleDelayStopwatch;
- final private Stopwatch readNextDelayStopwatch;
- private DLSN startDLSN;
- private ReadAheadEntryReader readAheadReader = null;
- private int lastPosition = 0;
- private final boolean positionGapDetectionEnabled;
- private final int idleErrorThresholdMillis;
- final ScheduledFuture<?> idleReaderTimeoutTask;
- private ScheduledFuture<?> backgroundScheduleTask = null;
- // last process time
- private final Stopwatch lastProcessTime;
-
- protected Promise<Void> closeFuture = null;
-
- private boolean lockStream = false;
-
- private final boolean returnEndOfStreamRecord;
-
- private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() {
- @Override
- public void run() {
- synchronized (scheduleLock) {
- backgroundScheduleTask = null;
- }
- scheduleBackgroundRead();
- }
- };
-
- // State
- private Entry.Reader currentEntry = null;
- private LogRecordWithDLSN nextRecord = null;
-
- // Failure Injector
- private boolean disableProcessingReadRequests = false;
-
- // Stats
- private final OpStatsLogger readNextExecTime;
- private final OpStatsLogger delayUntilPromiseSatisfied;
- private final OpStatsLogger timeBetweenReadNexts;
- private final OpStatsLogger futureSetLatency;
- private final OpStatsLogger scheduleLatency;
- private final OpStatsLogger backgroundReaderRunTime;
- private final Counter idleReaderCheckCount;
- private final Counter idleReaderCheckIdleReadRequestCount;
- private final Counter idleReaderCheckIdleReadAheadCount;
- private final Counter idleReaderError;
-
- private class PendingReadRequest {
- private final Stopwatch enqueueTime;
- private final int numEntries;
- private final List<LogRecordWithDLSN> records;
- private final Promise<List<LogRecordWithDLSN>> promise;
- private final long deadlineTime;
- private final TimeUnit deadlineTimeUnit;
-
- PendingReadRequest(int numEntries,
- long deadlineTime,
- TimeUnit deadlineTimeUnit) {
- this.numEntries = numEntries;
- this.enqueueTime = Stopwatch.createStarted();
- // optimize the space usage for single read.
- if (numEntries == 1) {
- this.records = new ArrayList<LogRecordWithDLSN>(1);
- } else {
- this.records = new ArrayList<LogRecordWithDLSN>();
- }
- this.promise = new Promise<List<LogRecordWithDLSN>>();
- this.deadlineTime = deadlineTime;
- this.deadlineTimeUnit = deadlineTimeUnit;
- }
-
- Promise<List<LogRecordWithDLSN>> getPromise() {
- return promise;
- }
-
- long elapsedSinceEnqueue(TimeUnit timeUnit) {
- return enqueueTime.elapsed(timeUnit);
- }
-
- void setException(Throwable throwable) {
- Stopwatch stopwatch = Stopwatch.createStarted();
- if (promise.updateIfEmpty(new Throw<List<LogRecordWithDLSN>>(throwable))) {
- futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS));
- }
- }
-
- boolean hasReadRecords() {
- return records.size() > 0;
- }
-
- boolean hasReadEnoughRecords() {
- return records.size() >= numEntries;
- }
-
- long getRemainingWaitTime() {
- if (deadlineTime <= 0L) {
- return 0L;
- }
- return deadlineTime - elapsedSinceEnqueue(deadlineTimeUnit);
- }
-
- void addRecord(LogRecordWithDLSN record) {
- records.add(record);
- }
-
- void complete() {
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} : Satisfied promise with {} records", readHandler.getFullyQualifiedName(), records.size());
- }
- delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS));
- Stopwatch stopwatch = Stopwatch.createStarted();
- promise.setValue(records);
- futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- }
- }
-
- BKAsyncLogReaderDLSN(BKDistributedLogManager bkdlm,
- ScheduledExecutorService executorService,
- DLSN startDLSN,
- Optional<String> subscriberId,
- boolean returnEndOfStreamRecord,
- StatsLogger statsLogger) {
- this.bkDistributedLogManager = bkdlm;
- this.executorService = executorService;
- this.readHandler = bkDistributedLogManager.createReadHandler(subscriberId,
- this, true);
- LOG.debug("Starting async reader at {}", startDLSN);
- this.startDLSN = startDLSN;
- this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
- this.readNextDelayStopwatch = Stopwatch.createStarted();
- this.positionGapDetectionEnabled = bkdlm.getConf().getPositionGapDetectionEnabled();
- this.idleErrorThresholdMillis = bkdlm.getConf().getReaderIdleErrorThresholdMillis();
- this.returnEndOfStreamRecord = returnEndOfStreamRecord;
-
- // Stats
- StatsLogger asyncReaderStatsLogger = statsLogger.scope("async_reader");
- futureSetLatency = asyncReaderStatsLogger.getOpStatsLogger("future_set");
- scheduleLatency = asyncReaderStatsLogger.getOpStatsLogger("schedule");
- backgroundReaderRunTime = asyncReaderStatsLogger.getOpStatsLogger("background_read");
- readNextExecTime = asyncReaderStatsLogger.getOpStatsLogger("read_next_exec");
- timeBetweenReadNexts = asyncReaderStatsLogger.getOpStatsLogger("time_between_read_next");
- delayUntilPromiseSatisfied = asyncReaderStatsLogger.getOpStatsLogger("delay_until_promise_satisfied");
- idleReaderError = asyncReaderStatsLogger.getCounter("idle_reader_error");
- idleReaderCheckCount = asyncReaderStatsLogger.getCounter("idle_reader_check_total");
- idleReaderCheckIdleReadRequestCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_read_requests");
- idleReaderCheckIdleReadAheadCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_readahead");
-
- // Lock the stream if requested. The lock will be released when the reader is closed.
- this.lockStream = false;
- this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary();
- this.lastProcessTime = Stopwatch.createStarted();
- }
-
- private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
- if (idleErrorThresholdMillis < Integer.MAX_VALUE) {
- // Dont run the task more than once every seconds (for sanity)
- long period = Math.max(idleErrorThresholdMillis / 10, 1000);
- // Except when idle reader threshold is less than a second (tests?)
- period = Math.min(period, idleErrorThresholdMillis / 5);
-
- return executorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- PendingReadRequest nextRequest = pendingRequests.peek();
-
- idleReaderCheckCount.inc();
- if (null == nextRequest) {
- return;
- }
-
- idleReaderCheckIdleReadRequestCount.inc();
- if (nextRequest.elapsedSinceEnqueue(TimeUnit.MILLISECONDS) < idleErrorThresholdMillis) {
- return;
- }
-
- ReadAheadEntryReader readAheadReader = getReadAheadReader();
-
- // read request has been idle
- // - cache has records but read request are idle,
- // that means notification was missed between readahead and reader.
- // - cache is empty and readahead is idle (no records added for a long time)
- idleReaderCheckIdleReadAheadCount.inc();
- try {
- if (null == readAheadReader || (!hasMoreRecords() &&
- readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS))) {
- markReaderAsIdle();
- return;
- } else if (lastProcessTime.elapsed(TimeUnit.MILLISECONDS) > idleErrorThresholdMillis) {
- markReaderAsIdle();;
- }
- } catch (IOException e) {
- setLastException(e);
- return;
- }
- }
- }, period, period, TimeUnit.MILLISECONDS);
- }
- return null;
- }
-
- synchronized ReadAheadEntryReader getReadAheadReader() {
- return readAheadReader;
- }
-
- void cancelIdleReaderTask() {
- // Do this after we have checked that the reader was not previously closed
- try {
- if (null != idleReaderTimeoutTask) {
- idleReaderTimeoutTask.cancel(true);
- }
- } catch (Exception exc) {
- LOG.info("{}: Failed to cancel the background idle reader timeout task", readHandler.getFullyQualifiedName());
- }
- }
-
- private void markReaderAsIdle() {
- idleReaderError.inc();
- IdleReaderException ire = new IdleReaderException("Reader on stream "
- + readHandler.getFullyQualifiedName()
- + " is idle for " + idleErrorThresholdMillis +" ms");
- setLastException(ire);
- // cancel all pending reads directly rather than notifying on error
- // because idle reader could happen on idle read requests that usually means something wrong
- // in scheduling reads
- cancelAllPendingReads(ire);
- }
-
- protected synchronized void setStartDLSN(DLSN fromDLSN) throws UnexpectedException {
- if (null != readAheadReader) {
- throw new UnexpectedException("Could't reset from dlsn after reader already starts reading.");
- }
- startDLSN = fromDLSN;
- }
-
- @VisibleForTesting
- public synchronized DLSN getStartDLSN() {
- return startDLSN;
- }
-
- public Future<Void> lockStream() {
- this.lockStream = true;
- return readHandler.lockStream();
- }
-
- private boolean checkClosedOrInError(String operation) {
- if (null == lastException.get()) {
- try {
- if (null != readHandler && null != getReadAheadReader()) {
- getReadAheadReader().checkLastException();
- }
-
- bkDistributedLogManager.checkClosedOrInError(operation);
- } catch (IOException exc) {
- setLastException(exc);
- }
- }
-
- if (lockStream) {
- try {
- readHandler.checkReadLock();
- } catch (IOException ex) {
- setLastException(ex);
- }
- }
-
- if (null != lastException.get()) {
- LOG.trace("Cancelling pending reads");
- cancelAllPendingReads(lastException.get());
- return true;
- }
-
- return false;
- }
-
- private void setLastException(IOException exc) {
- lastException.compareAndSet(null, exc);
- }
-
- @Override
- public String getStreamName() {
- return bkDistributedLogManager.getStreamName();
- }
-
- /**
- * @return A promise that when satisfied will contain the Log Record with its DLSN.
- */
- @Override
- public synchronized Future<LogRecordWithDLSN> readNext() {
- return readInternal(1, 0, TimeUnit.MILLISECONDS).map(READ_NEXT_MAP_FUNCTION);
- }
-
- public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries) {
- return readInternal(numEntries, 0, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries,
- long waitTime,
- TimeUnit timeUnit) {
- return readInternal(numEntries, waitTime, timeUnit);
- }
-
- /**
- * Read up to <i>numEntries</i> entries. The future will be satisfied when any number of entries are
- * ready (1 to <i>numEntries</i>).
- *
- * @param numEntries
- * num entries to read
- * @return A promise that satisfied with a non-empty list of log records with their DLSN.
- */
- private synchronized Future<List<LogRecordWithDLSN>> readInternal(int numEntries,
- long deadlineTime,
- TimeUnit deadlineTimeUnit) {
- timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
- readNextDelayStopwatch.reset().start();
- final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit);
-
- if (null == readAheadReader) {
- try {
- final ReadAheadEntryReader readAheadEntryReader = this.readAheadReader = new ReadAheadEntryReader(
- getStreamName(),
- getStartDLSN(),
- bkDistributedLogManager.getConf(),
- readHandler,
- bkDistributedLogManager.getReaderEntryStore(),
- bkDistributedLogManager.getScheduler(),
- Ticker.systemTicker(),
- bkDistributedLogManager.alertStatsLogger);
- readHandler.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() {
- @Override
- public void onSuccess(Void value) {
- try {
- readHandler.registerListener(readAheadEntryReader);
- readHandler.asyncStartFetchLogSegments()
- .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
- readAheadEntryReader.addStateChangeNotification(BKAsyncLogReaderDLSN.this);
- readAheadEntryReader.start(logSegments.getValue());
- return BoxedUnit.UNIT;
- }
- });
- } catch (Exception exc) {
- notifyOnError(exc);
- }
- }
-
- @Override
- public void onFailure(Throwable cause) {
- notifyOnError(cause);
- }
- });
- } catch (IOException ioe) {
- notifyOnError(ioe);
- }
- }
-
- if (checkClosedOrInError("readNext")) {
- readRequest.setException(lastException.get());
- } else {
- boolean queueEmpty = pendingRequests.isEmpty();
- pendingRequests.add(readRequest);
-
- if (queueEmpty) {
- scheduleBackgroundRead();
- }
- }
-
- readNextExecTime.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
- readNextDelayStopwatch.reset().start();
-
- return readRequest.getPromise();
- }
-
- public synchronized void scheduleBackgroundRead() {
- // if the reader is already closed, we don't need to schedule background read again.
- if (null != closeFuture) {
- return;
- }
-
- long prevCount = scheduleCount.getAndIncrement();
- if (0 == prevCount) {
- scheduleDelayStopwatch.reset().start();
- executorService.submit(this);
- }
- }
-
- @Override
- public Future<Void> asyncClose() {
- // Cancel the idle reader timeout task, interrupting if necessary
- ReadCancelledException exception;
- Promise<Void> closePromise;
- synchronized (this) {
- if (null != closeFuture) {
- return closeFuture;
- }
- closePromise = closeFuture = new Promise<Void>();
- exception = new ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was closed");
- setLastException(exception);
- }
-
- // Do this after we have checked that the reader was not previously closed
- cancelIdleReaderTask();
-
- synchronized (scheduleLock) {
- if (null != backgroundScheduleTask) {
- backgroundScheduleTask.cancel(true);
- }
- }
-
- cancelAllPendingReads(exception);
-
- ReadAheadEntryReader readAheadReader = getReadAheadReader();
- if (null != readAheadReader) {
- readHandler.unregisterListener(readAheadReader);
- readAheadReader.removeStateChangeNotification(this);
- }
- Utils.closeSequence(bkDistributedLogManager.getScheduler(), true,
- readAheadReader,
- readHandler
- ).proxyTo(closePromise);
- return closePromise;
- }
-
- private void cancelAllPendingReads(Throwable throwExc) {
- for (PendingReadRequest promise : pendingRequests) {
- promise.setException(throwExc);
- }
- pendingRequests.clear();
- }
-
- synchronized boolean hasMoreRecords() throws IOException {
- if (null == readAheadReader) {
- return false;
- }
- if (readAheadReader.getNumCachedEntries() > 0 || null != nextRecord) {
- return true;
- } else if (null != currentEntry) {
- nextRecord = currentEntry.nextRecord();
- return null != nextRecord;
- }
- return false;
- }
-
- private synchronized LogRecordWithDLSN readNextRecord() throws IOException {
- if (null == readAheadReader) {
- return null;
- }
- if (null == currentEntry) {
- currentEntry = readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
- // no entry after reading from read ahead then return null
- if (null == currentEntry) {
- return null;
- }
- }
-
- LogRecordWithDLSN recordToReturn;
- if (null == nextRecord) {
- nextRecord = currentEntry.nextRecord();
- // no more records in current entry
- if (null == nextRecord) {
- currentEntry = null;
- return readNextRecord();
- }
- }
-
- // found a record to return and prefetch the next one
- recordToReturn = nextRecord;
- nextRecord = currentEntry.nextRecord();
- return recordToReturn;
- }
-
- @Override
- public void run() {
- synchronized(scheduleLock) {
- if (scheduleDelayStopwatch.isRunning()) {
- scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- }
-
- Stopwatch runTime = Stopwatch.createStarted();
- int iterations = 0;
- long scheduleCountLocal = scheduleCount.get();
- LOG.debug("{}: Scheduled Background Reader", readHandler.getFullyQualifiedName());
- while(true) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("{}: Executing Iteration: {}", readHandler.getFullyQualifiedName(), iterations++);
- }
-
- PendingReadRequest nextRequest = null;
- synchronized(this) {
- nextRequest = pendingRequests.peek();
-
- // Queue is empty, nothing to read, return
- if (null == nextRequest) {
- LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName());
- scheduleCount.set(0);
- backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
- return;
- }
-
- if (disableProcessingReadRequests) {
- LOG.info("Reader of {} is forced to stop processing read requests", readHandler.getFullyQualifiedName());
- return;
- }
- }
- lastProcessTime.reset().start();
-
- // If the oldest pending promise is interrupted then we must mark
- // the reader in error and abort all pending reads since we dont
- // know the last consumed read
- if (null == lastException.get()) {
- if (nextRequest.getPromise().isInterrupted().isDefined()) {
- setLastException(new DLInterruptedException("Interrupted on reading " + readHandler.getFullyQualifiedName() + " : ",
- nextRequest.getPromise().isInterrupted().get()));
- }
- }
-
- if (checkClosedOrInError("readNext")) {
- if (!(lastException.get().getCause() instanceof LogNotFoundException)) {
- LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get());
- }
- backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
- return;
- }
-
- try {
- // Fail 10% of the requests when asked to simulate errors
- if (bkDistributedLogManager.getFailureInjector().shouldInjectErrors()) {
- throw new IOException("Reader Simulated Exception");
- }
- LogRecordWithDLSN record;
- while (!nextRequest.hasReadEnoughRecords()) {
- // read single record
- do {
- record = readNextRecord();
- } while (null != record && (record.isControl() || (record.getDlsn().compareTo(getStartDLSN()) < 0)));
- if (null == record) {
- break;
- } else {
- if (record.isEndOfStream() && !returnEndOfStreamRecord) {
- setLastException(new EndOfStreamException("End of Stream Reached for "
- + readHandler.getFullyQualifiedName()));
- break;
- }
-
- // gap detection
- if (recordPositionsContainsGap(record, lastPosition)) {
- bkDistributedLogManager.raiseAlert("Gap detected between records at record = {}", record);
- if (positionGapDetectionEnabled) {
- throw new DLIllegalStateException("Gap detected between records at record = " + record);
- }
- }
- lastPosition = record.getLastPositionWithinLogSegment();
-
- nextRequest.addRecord(record);
- }
- };
- } catch (IOException exc) {
- setLastException(exc);
- if (!(exc instanceof LogNotFoundException)) {
- LOG.warn("{} : read with skip Exception", readHandler.getFullyQualifiedName(), lastException.get());
- }
- continue;
- }
-
- if (nextRequest.hasReadRecords()) {
- long remainingWaitTime = nextRequest.getRemainingWaitTime();
- if (remainingWaitTime > 0 && !nextRequest.hasReadEnoughRecords()) {
- backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
- scheduleDelayStopwatch.reset().start();
- scheduleCount.set(0);
- // the request could still wait for more records
- backgroundScheduleTask = executorService.schedule(BACKGROUND_READ_SCHEDULER, remainingWaitTime, nextRequest.deadlineTimeUnit);
- return;
- }
-
- PendingReadRequest request = pendingRequests.poll();
- if (null != request && nextRequest == request) {
- request.complete();
- if (null != backgroundScheduleTask) {
- backgroundScheduleTask.cancel(true);
- backgroundScheduleTask = null;
- }
- } else {
- DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at dlsn = "
- + nextRequest.records.get(0).getDlsn());
- nextRequest.setException(ise);
- if (null != request) {
- request.setException(ise);
- }
- // We should never get here as we should have exited the loop if
- // pendingRequests were empty
- bkDistributedLogManager.raiseAlert("Unexpected condition at dlsn = {}",
- nextRequest.records.get(0).getDlsn());
- setLastException(ise);
- }
- } else {
- if (0 == scheduleCountLocal) {
- LOG.trace("Schedule count dropping to zero", lastException.get());
- backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
- return;
- }
- scheduleCountLocal = scheduleCount.decrementAndGet();
- }
- }
- }
- }
-
- private boolean recordPositionsContainsGap(LogRecordWithDLSN record, long lastPosition) {
- final boolean firstLogRecord = (1 == record.getPositionWithinLogSegment());
- final boolean endOfStreamRecord = record.isEndOfStream();
- final boolean emptyLogSegment = (0 == lastPosition);
- final boolean positionIncreasedByOne = (record.getPositionWithinLogSegment() == (lastPosition + 1));
-
- return !firstLogRecord && !endOfStreamRecord && !emptyLogSegment &&
- !positionIncreasedByOne;
- }
-
- /**
- * Triggered when the background activity encounters an exception
- */
- @Override
- public void notifyOnError(Throwable cause) {
- if (cause instanceof IOException) {
- setLastException((IOException) cause);
- } else {
- setLastException(new IOException(cause));
- }
- scheduleBackgroundRead();
- }
-
- /**
- * Triggered when the background activity completes an operation
- */
- @Override
- public void notifyOnOperationComplete() {
- scheduleBackgroundRead();
- }
-
- @VisibleForTesting
- void simulateErrors() {
- bkDistributedLogManager.getFailureInjector().injectErrors(true);
- }
-
- @VisibleForTesting
- synchronized void disableReadAheadLogSegmentsNotification() {
- readHandler.disableReadAheadLogSegmentsNotification();
- }
-
- @VisibleForTesting
- synchronized void disableProcessingReadRequests() {
- disableProcessingReadRequests = true;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index 4963787..219c0cf 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -98,7 +98,7 @@ import java.util.concurrent.TimeUnit;
* <li> `log_writer/*`: all asynchronous writer related metrics are exposed under scope `log_writer`.
* See {@link BKAsyncLogWriter} for detail stats.
* <li> `async_reader/*`: all asyncrhonous reader related metrics are exposed under scope `async_reader`.
- * See {@link BKAsyncLogReaderDLSN} for detail stats.
+ * See {@link BKAsyncLogReader} for detail stats.
* <li> `writer_future_pool/*`: metrics about the future pools that used by writers are exposed under
* scope `writer_future_pool`. See {@link MonitoredFuturePool} for detail stats.
* <li> `reader_future_pool/*`: metrics about the future pools that used by readers are exposed under
@@ -955,7 +955,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
@Override
public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) {
Optional<String> subscriberId = Optional.absent();
- AsyncLogReader reader = new BKAsyncLogReaderDLSN(
+ AsyncLogReader reader = new BKAsyncLogReader(
this,
scheduler,
fromDLSN,
@@ -993,7 +993,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
if (!fromDLSN.isPresent() && !subscriberId.isPresent()) {
return Future.exception(new UnexpectedException("Neither from dlsn nor subscriber id is provided."));
}
- final BKAsyncLogReaderDLSN reader = new BKAsyncLogReaderDLSN(
+ final BKAsyncLogReader reader = new BKAsyncLogReader(
BKDistributedLogManager.this,
scheduler,
fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN,
@@ -1077,7 +1077,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
throws IOException {
LOG.info("Create sync reader starting from {}", fromDLSN);
checkClosedOrInError("getInputStream");
- return new BKSyncLogReaderDLSN(
+ return new BKSyncLogReader(
conf,
this,
fromDLSN,
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java
new file mode 100644
index 0000000..308f42a
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Ticker;
+import com.twitter.distributedlog.exceptions.EndOfStreamException;
+import com.twitter.distributedlog.exceptions.IdleReaderException;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.Utils;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Synchronous Log Reader based on {@link AsyncLogReader}
+ */
+class BKSyncLogReader implements LogReader, AsyncNotification {
+
+ private final BKDistributedLogManager bkdlm;
+ private final BKLogReadHandler readHandler;
+ private final AtomicReference<IOException> readerException =
+ new AtomicReference<IOException>(null);
+ private final int maxReadAheadWaitTime;
+ private Promise<Void> closeFuture;
+ private final Optional<Long> startTransactionId;
+ private boolean positioned = false;
+ private Entry.Reader currentEntry = null;
+
+ // readahead reader
+ ReadAheadEntryReader readAheadReader = null;
+
+ // idle reader settings
+ private final boolean shouldCheckIdleReader;
+ private final int idleErrorThresholdMillis;
+
+ // Stats
+ private final Counter idleReaderError;
+
+ BKSyncLogReader(DistributedLogConfiguration conf,
+ BKDistributedLogManager bkdlm,
+ DLSN startDLSN,
+ Optional<Long> startTransactionId,
+ StatsLogger statsLogger) throws IOException {
+ this.bkdlm = bkdlm;
+ this.readHandler = bkdlm.createReadHandler(
+ Optional.<String>absent(),
+ this,
+ true);
+ this.maxReadAheadWaitTime = conf.getReadAheadWaitTime();
+ this.idleErrorThresholdMillis = conf.getReaderIdleErrorThresholdMillis();
+ this.shouldCheckIdleReader = idleErrorThresholdMillis > 0 && idleErrorThresholdMillis < Integer.MAX_VALUE;
+ this.startTransactionId = startTransactionId;
+
+ // start readahead
+ startReadAhead(startDLSN);
+ if (!startTransactionId.isPresent()) {
+ positioned = true;
+ }
+
+ // Stats
+ StatsLogger syncReaderStatsLogger = statsLogger.scope("sync_reader");
+ idleReaderError = syncReaderStatsLogger.getCounter("idle_reader_error");
+ }
+
+ private void startReadAhead(DLSN startDLSN) throws IOException {
+ readAheadReader = new ReadAheadEntryReader(
+ bkdlm.getStreamName(),
+ startDLSN,
+ bkdlm.getConf(),
+ readHandler,
+ bkdlm.getReaderEntryStore(),
+ bkdlm.getScheduler(),
+ Ticker.systemTicker(),
+ bkdlm.alertStatsLogger);
+ readHandler.registerListener(readAheadReader);
+ readHandler.asyncStartFetchLogSegments()
+ .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
+ readAheadReader.addStateChangeNotification(BKSyncLogReader.this);
+ readAheadReader.start(logSegments.getValue());
+ return BoxedUnit.UNIT;
+ }
+ });
+ }
+
+ @VisibleForTesting
+ ReadAheadEntryReader getReadAheadReader() {
+ return readAheadReader;
+ }
+
+ @VisibleForTesting
+ BKLogReadHandler getReadHandler() {
+ return readHandler;
+ }
+
+ private Entry.Reader readNextEntry(boolean nonBlocking) throws IOException {
+ Entry.Reader entry = null;
+ if (nonBlocking) {
+ return readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
+ } else {
+ while (!readAheadReader.isReadAheadCaughtUp()
+ && null == readerException.get()
+ && null == entry) {
+ entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
+ }
+ if (null != entry) {
+ return entry;
+ }
+ // reader is caught up
+ if (readAheadReader.isReadAheadCaughtUp()
+ && null == readerException.get()) {
+ entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
+ }
+ return entry;
+ }
+ }
+
+ private void markReaderAsIdle() throws IdleReaderException {
+ idleReaderError.inc();
+ IdleReaderException ire = new IdleReaderException("Sync reader on stream "
+ + readHandler.getFullyQualifiedName()
+ + " is idle for more than " + idleErrorThresholdMillis + " ms");
+ readerException.compareAndSet(null, ire);
+ throw ire;
+ }
+
+ @Override
+ public synchronized LogRecordWithDLSN readNext(boolean nonBlocking)
+ throws IOException {
+ if (null != readerException.get()) {
+ throw readerException.get();
+ }
+ LogRecordWithDLSN record = doReadNext(nonBlocking);
+ // no record is returned, check if the reader becomes idle
+ if (null == record && shouldCheckIdleReader) {
+ if (readAheadReader.getNumCachedEntries() <= 0 &&
+ readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) {
+ markReaderAsIdle();
+ }
+ }
+ return record;
+ }
+
+ private LogRecordWithDLSN doReadNext(boolean nonBlocking) throws IOException {
+ LogRecordWithDLSN record = null;
+
+ do {
+ // fetch one record until we don't find any entry available in the readahead cache
+ while (null == record) {
+ if (null == currentEntry) {
+ currentEntry = readNextEntry(nonBlocking);
+ if (null == currentEntry) {
+ return null;
+ }
+ }
+ record = currentEntry.nextRecord();
+ if (null == record) {
+ currentEntry = null;
+ }
+ }
+
+ // check if we reached the end of stream
+ if (record.isEndOfStream()) {
+ EndOfStreamException eos = new EndOfStreamException("End of Stream Reached for "
+ + readHandler.getFullyQualifiedName());
+ readerException.compareAndSet(null, eos);
+ throw eos;
+ }
+ // skip control records
+ if (record.isControl()) {
+ record = null;
+ continue;
+ }
+ if (!positioned) {
+ if (record.getTransactionId() < startTransactionId.get()) {
+ record = null;
+ continue;
+ } else {
+ positioned = true;
+ break;
+ }
+ } else {
+ break;
+ }
+ } while (true);
+ return record;
+ }
+
+ @Override
+ public synchronized List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords)
+ throws IOException {
+ LinkedList<LogRecordWithDLSN> retList =
+ new LinkedList<LogRecordWithDLSN>();
+
+ int numRead = 0;
+ LogRecordWithDLSN record = readNext(nonBlocking);
+ while ((null != record)) {
+ retList.add(record);
+ numRead++;
+ if (numRead >= numLogRecords) {
+ break;
+ }
+ record = readNext(nonBlocking);
+ }
+ return retList;
+ }
+
+ @Override
+ public Future<Void> asyncClose() {
+ Promise<Void> closePromise;
+ synchronized (this) {
+ if (null != closeFuture) {
+ return closeFuture;
+ }
+ closeFuture = closePromise = new Promise<Void>();
+ }
+ readHandler.unregisterListener(readAheadReader);
+ readAheadReader.removeStateChangeNotification(this);
+ Utils.closeSequence(bkdlm.getScheduler(), true,
+ readAheadReader,
+ readHandler
+ ).proxyTo(closePromise);
+ return closePromise;
+ }
+
+ @Override
+ public void close() throws IOException {
+ FutureUtils.result(asyncClose());
+ }
+
+ //
+ // Notification From ReadHandler
+ //
+
+ @Override
+ public void notifyOnError(Throwable cause) {
+ if (cause instanceof IOException) {
+ readerException.compareAndSet(null, (IOException) cause);
+ } else {
+ readerException.compareAndSet(null, new IOException(cause));
+ }
+ }
+
+ @Override
+ public void notifyOnOperationComplete() {
+ // no-op
+ }
+}