You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:29 UTC
[15/31] incubator-distributedlog git commit: DL-159: ReadAhead
Improvement (part 2) - New ReadAhead Reader using the LogSegmentEntryReader
interface
DL-159: ReadAhead Improvement (part 2) - New ReadAhead Reader using the LogSegmentEntryReader interface
Provide a new ReadAhead reader using the log segment entry reader interface. It does reading entries in a batch in parallel for batches, rather than reading entries in batch by batch. This would help mitigate the slow bookie problem.
The core change is the new ReadAheadEntryReader.
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/7a977972
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/7a977972
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/7a977972
Branch: refs/heads/master
Commit: 7a977972200da9e86f4557bb002ce16cb60d236a
Parents: 27c04f3
Author: Sijie Guo <si...@twitter.com>
Authored: Wed Dec 28 15:09:38 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Thu Dec 29 02:11:40 2016 -0800
----------------------------------------------------------------------
.../distributedlog/BKAsyncLogReaderDLSN.java | 198 ++--
.../distributedlog/BKDistributedLogManager.java | 54 +-
.../distributedlog/BKLogReadHandler.java | 90 +-
.../distributedlog/BKSyncLogReaderDLSN.java | 80 +-
.../java/com/twitter/distributedlog/Entry.java | 14 +
.../twitter/distributedlog/EntryPosition.java | 63 ++
.../distributedlog/EnvelopedEntryReader.java | 10 +
.../com/twitter/distributedlog/MaxTxId.java | 2 +-
.../distributedlog/ReadAheadEntryReader.java | 966 +++++++++++++++++++
.../logsegment/BKLogSegmentEntryReader.java | 118 ++-
.../impl/logsegment/BKLogSegmentEntryStore.java | 127 +++
.../injector/AsyncFailureInjector.java | 7 +-
.../injector/AsyncRandomFailureInjector.java | 15 +-
.../logsegment/LogSegmentEntryReader.java | 21 +
.../logsegment/LogSegmentEntryStore.java | 4 +-
.../rate/MovingAverageRateFactory.java | 2 +-
.../readahead/ReadAheadWorker.java | 12 +-
.../distributedlog/util/OrderedScheduler.java | 8 +
.../distributedlog/TestAsyncReaderLock.java | 2 +-
.../distributedlog/TestAsyncReaderWriter.java | 39 +-
.../distributedlog/TestBKSyncLogReader.java | 5 +-
.../distributedlog/TestEntryPosition.java | 59 ++
.../TestNonBlockingReadsMultiReader.java | 5 +-
.../twitter/distributedlog/TestReadAhead.java | 158 ---
.../TestReadAheadEntryReader.java | 423 ++++++++
.../com/twitter/distributedlog/TestReader.java | 4 +
.../distributedlog/TestRollLogSegments.java | 12 +-
.../twitter/distributedlog/TestTruncate.java | 1 -
.../logsegment/TestBKLogSegmentEntryReader.java | 38 +-
.../LogSegmentIsTruncatedException.java | 33 +
.../src/main/thrift/service.thrift | 2 +
31 files changed, 2114 insertions(+), 458 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
index 2ca064c..e347012 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
@@ -20,6 +20,7 @@ package com.twitter.distributedlog;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
import com.twitter.distributedlog.exceptions.DLIllegalStateException;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.EndOfStreamException;
@@ -27,9 +28,7 @@ import com.twitter.distributedlog.exceptions.IdleReaderException;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.ReadCancelledException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.injector.AsyncFailureInjector;
-import com.twitter.distributedlog.injector.AsyncRandomFailureInjector;
-import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.Utils;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
@@ -48,10 +47,12 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
/**
* BookKeeper based {@link AsyncLogReader} implementation.
@@ -83,7 +84,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
};
protected final BKDistributedLogManager bkDistributedLogManager;
- protected final BKLogReadHandler bkLedgerManager;
+ protected final BKLogReadHandler readHandler;
private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
private final ScheduledExecutorService executorService;
private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>();
@@ -92,19 +93,19 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
final private Stopwatch scheduleDelayStopwatch;
final private Stopwatch readNextDelayStopwatch;
private DLSN startDLSN;
- private boolean readAheadStarted = false;
+ private ReadAheadEntryReader readAheadReader = null;
private int lastPosition = 0;
private final boolean positionGapDetectionEnabled;
private final int idleErrorThresholdMillis;
final ScheduledFuture<?> idleReaderTimeoutTask;
private ScheduledFuture<?> backgroundScheduleTask = null;
+ // last process time
+ private final Stopwatch lastProcessTime;
protected Promise<Void> closeFuture = null;
private boolean lockStream = false;
- private boolean disableReadAheadLogSegmentsNotification = false;
-
private final boolean returnEndOfStreamRecord;
private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() {
@@ -122,7 +123,6 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
private LogRecordWithDLSN nextRecord = null;
// Failure Injector
- private final AsyncFailureInjector failureInjector;
private boolean disableProcessingReadRequests = false;
// Stats
@@ -198,7 +198,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
void complete() {
if (LOG.isTraceEnabled()) {
- LOG.trace("{} : Satisfied promise with {} records", bkLedgerManager.getFullyQualifiedName(), records.size());
+ LOG.trace("{} : Satisfied promise with {} records", readHandler.getFullyQualifiedName(), records.size());
}
delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS));
Stopwatch stopwatch = Stopwatch.createStarted();
@@ -212,12 +212,11 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
DLSN startDLSN,
Optional<String> subscriberId,
boolean returnEndOfStreamRecord,
- boolean deserializeRecordSet,
StatsLogger statsLogger) {
this.bkDistributedLogManager = bkdlm;
this.executorService = executorService;
- this.bkLedgerManager = bkDistributedLogManager.createReadHandler(subscriberId,
- this, deserializeRecordSet, true);
+ this.readHandler = bkDistributedLogManager.createReadHandler(subscriberId,
+ this, true);
LOG.debug("Starting async reader at {}", startDLSN);
this.startDLSN = startDLSN;
this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
@@ -226,16 +225,6 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
this.idleErrorThresholdMillis = bkdlm.getConf().getReaderIdleErrorThresholdMillis();
this.returnEndOfStreamRecord = returnEndOfStreamRecord;
- // Failure Injection
- this.failureInjector = AsyncRandomFailureInjector.newBuilder()
- .injectDelays(bkdlm.getConf().getEIInjectReadAheadDelay(),
- bkdlm.getConf().getEIInjectReadAheadDelayPercent(),
- bkdlm.getConf().getEIInjectMaxReadAheadDelayMs())
- .injectErrors(false, 10)
- .injectStops(bkdlm.getConf().getEIInjectReadAheadStall(), 10)
- .injectCorruption(bkdlm.getConf().getEIInjectReadAheadBrokenEntries())
- .build();
-
// Stats
StatsLogger asyncReaderStatsLogger = statsLogger.scope("async_reader");
futureSetLatency = asyncReaderStatsLogger.getOpStatsLogger("future_set");
@@ -252,6 +241,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
// Lock the stream if requested. The lock will be released when the reader is closed.
this.lockStream = false;
this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary();
+ this.lastProcessTime = Stopwatch.createStarted();
}
private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
@@ -276,7 +266,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
return;
}
- ReadAheadCache cache = bkLedgerManager.getReadAheadCache();
+ ReadAheadEntryReader readAheadReader = getReadAheadReader();
// read request has been idle
// - cache has records but read request are idle,
@@ -284,32 +274,27 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
// - cache is empty and readahead is idle (no records added for a long time)
idleReaderCheckIdleReadAheadCount.inc();
try {
- if (!hasMoreRecords(cache)
- && !cache.isReadAheadIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) {
+ if (null == readAheadReader || (!hasMoreRecords() &&
+ readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS))) {
+ markReaderAsIdle();
return;
+ } else if (lastProcessTime.elapsed(TimeUnit.MILLISECONDS) > idleErrorThresholdMillis) {
+ markReaderAsIdle();;
}
} catch (IOException e) {
- // we encountered exceptions on checking more records
setLastException(e);
return;
}
-
- idleReaderError.inc();
- IdleReaderException ire = new IdleReaderException("Reader on stream "
- + bkLedgerManager.getFullyQualifiedName()
- + " is idle for " + idleErrorThresholdMillis +" ms");
- setLastException(ire);
- // cancel all pending reads directly rather than notifying on error
- // because idle reader could happen on idle read requests that usually means something wrong
- // in scheduling reads
- cancelAllPendingReads(ire);
}
}, period, period, TimeUnit.MILLISECONDS);
}
-
return null;
}
+ synchronized ReadAheadEntryReader getReadAheadReader() {
+ return readAheadReader;
+ }
+
void cancelIdleReaderTask() {
// Do this after we have checked that the reader was not previously closed
try {
@@ -317,12 +302,24 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
idleReaderTimeoutTask.cancel(true);
}
} catch (Exception exc) {
- LOG.info("{}: Failed to cancel the background idle reader timeout task", bkLedgerManager.getFullyQualifiedName());
+ LOG.info("{}: Failed to cancel the background idle reader timeout task", readHandler.getFullyQualifiedName());
}
}
+ private void markReaderAsIdle() {
+ idleReaderError.inc();
+ IdleReaderException ire = new IdleReaderException("Reader on stream "
+ + readHandler.getFullyQualifiedName()
+ + " is idle for " + idleErrorThresholdMillis +" ms");
+ setLastException(ire);
+ // cancel all pending reads directly rather than notifying on error
+ // because idle reader could happen on idle read requests that usually means something wrong
+ // in scheduling reads
+ cancelAllPendingReads(ire);
+ }
+
protected synchronized void setStartDLSN(DLSN fromDLSN) throws UnexpectedException {
- if (readAheadStarted) {
+ if (null != readAheadReader) {
throw new UnexpectedException("Could't reset from dlsn after reader already starts reading.");
}
startDLSN = fromDLSN;
@@ -335,14 +332,14 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
public Future<Void> lockStream() {
this.lockStream = true;
- return bkLedgerManager.lockStream();
+ return readHandler.lockStream();
}
private boolean checkClosedOrInError(String operation) {
if (null == lastException.get()) {
try {
- if (null != bkLedgerManager && null != bkLedgerManager.readAheadWorker) {
- bkLedgerManager.readAheadWorker.checkClosedOrInError();
+ if (null != readHandler && null != getReadAheadReader()) {
+ getReadAheadReader().checkLastException();
}
bkDistributedLogManager.checkClosedOrInError(operation);
@@ -353,7 +350,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
if (lockStream) {
try {
- bkLedgerManager.checkReadLock();
+ readHandler.checkReadLock();
} catch (IOException ex) {
setLastException(ex);
}
@@ -411,28 +408,44 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
readNextDelayStopwatch.reset().start();
final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit);
- if (!readAheadStarted) {
- bkLedgerManager.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() {
- @Override
- public void onSuccess(Void value) {
- try {
- bkLedgerManager.startReadAhead(
- new LedgerReadPosition(getStartDLSN()),
- failureInjector);
- if (disableReadAheadLogSegmentsNotification) {
- bkLedgerManager.disableReadAheadLogSegmentsNotification();
+ if (null == readAheadReader) {
+ try {
+ final ReadAheadEntryReader readAheadEntryReader = this.readAheadReader = new ReadAheadEntryReader(
+ getStreamName(),
+ getStartDLSN(),
+ bkDistributedLogManager.getConf(),
+ readHandler,
+ bkDistributedLogManager.getReaderEntryStore(),
+ bkDistributedLogManager.getScheduler(),
+ Ticker.systemTicker(),
+ bkDistributedLogManager.alertStatsLogger);
+ readHandler.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() {
+ @Override
+ public void onSuccess(Void value) {
+ try {
+ readHandler.registerListener(readAheadEntryReader);
+ readHandler.asyncStartFetchLogSegments()
+ .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
+ readAheadEntryReader.addStateChangeNotification(BKAsyncLogReaderDLSN.this);
+ readAheadEntryReader.start(logSegments.getValue());
+ return BoxedUnit.UNIT;
+ }
+ });
+ } catch (Exception exc) {
+ notifyOnError(exc);
}
- } catch (Exception exc) {
- notifyOnError(exc);
}
- }
- @Override
- public void onFailure(Throwable cause) {
- notifyOnError(cause);
- }
- });
- readAheadStarted = true;
+ @Override
+ public void onFailure(Throwable cause) {
+ notifyOnError(cause);
+ }
+ });
+ } catch (IOException ioe) {
+ notifyOnError(ioe);
+ }
}
if (checkClosedOrInError("readNext")) {
@@ -475,7 +488,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
return closeFuture;
}
closePromise = closeFuture = new Promise<Void>();
- exception = new ReadCancelledException(bkLedgerManager.getFullyQualifiedName(), "Reader was closed");
+ exception = new ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was closed");
setLastException(exception);
}
@@ -490,7 +503,15 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
cancelAllPendingReads(exception);
- FutureUtils.ignore(bkLedgerManager.asyncClose()).proxyTo(closePromise);
+ ReadAheadEntryReader readAheadReader = getReadAheadReader();
+ if (null != readAheadReader) {
+ readHandler.unregisterListener(readAheadReader);
+ readAheadReader.removeStateChangeNotification(this);
+ }
+ Utils.closeSequence(bkDistributedLogManager.getScheduler(), true,
+ readAheadReader,
+ readHandler
+ ).proxyTo(closePromise);
return closePromise;
}
@@ -501,25 +522,26 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
pendingRequests.clear();
}
- boolean hasMoreRecords() throws IOException {
- return hasMoreRecords(bkLedgerManager.readAheadCache);
- }
-
- private synchronized boolean hasMoreRecords(ReadAheadCache cache) throws IOException {
- if (cache.getNumCachedEntries() > 0 || null != nextRecord) {
+ synchronized boolean hasMoreRecords() throws IOException {
+ if (null == readAheadReader) {
+ return false;
+ }
+ if (readAheadReader.getNumCachedEntries() > 0 || null != nextRecord) {
return true;
} else if (null != currentEntry) {
nextRecord = currentEntry.nextRecord();
return null != nextRecord;
- } else {
- return false;
}
+ return false;
}
private synchronized LogRecordWithDLSN readNextRecord() throws IOException {
+ if (null == readAheadReader) {
+ return null;
+ }
if (null == currentEntry) {
- currentEntry = bkLedgerManager.getNextReadAheadEntry();
- // no current entry after reading from read head then return null
+ currentEntry = readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
+ // no entry after reading from read ahead then return null
if (null == currentEntry) {
return null;
}
@@ -551,10 +573,10 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
Stopwatch runTime = Stopwatch.createStarted();
int iterations = 0;
long scheduleCountLocal = scheduleCount.get();
- LOG.debug("{}: Scheduled Background Reader", bkLedgerManager.getFullyQualifiedName());
+ LOG.debug("{}: Scheduled Background Reader", readHandler.getFullyQualifiedName());
while(true) {
if (LOG.isTraceEnabled()) {
- LOG.trace("{}: Executing Iteration: {}", bkLedgerManager.getFullyQualifiedName(), iterations++);
+ LOG.trace("{}: Executing Iteration: {}", readHandler.getFullyQualifiedName(), iterations++);
}
PendingReadRequest nextRequest = null;
@@ -563,31 +585,32 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
// Queue is empty, nothing to read, return
if (null == nextRequest) {
- LOG.trace("{}: Queue Empty waiting for Input", bkLedgerManager.getFullyQualifiedName());
+ LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName());
scheduleCount.set(0);
backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
return;
}
if (disableProcessingReadRequests) {
- LOG.info("Reader of {} is forced to stop processing read requests", bkLedgerManager.getFullyQualifiedName());
+ LOG.info("Reader of {} is forced to stop processing read requests", readHandler.getFullyQualifiedName());
return;
}
}
+ lastProcessTime.reset().start();
// If the oldest pending promise is interrupted then we must mark
// the reader in error and abort all pending reads since we dont
// know the last consumed read
if (null == lastException.get()) {
if (nextRequest.getPromise().isInterrupted().isDefined()) {
- setLastException(new DLInterruptedException("Interrupted on reading " + bkLedgerManager.getFullyQualifiedName() + " : ",
+ setLastException(new DLInterruptedException("Interrupted on reading " + readHandler.getFullyQualifiedName() + " : ",
nextRequest.getPromise().isInterrupted().get()));
}
}
if (checkClosedOrInError("readNext")) {
if (!(lastException.get().getCause() instanceof LogNotFoundException)) {
- LOG.warn("{}: Exception", bkLedgerManager.getFullyQualifiedName(), lastException.get());
+ LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get());
}
backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
return;
@@ -595,7 +618,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
try {
// Fail 10% of the requests when asked to simulate errors
- if (failureInjector.shouldInjectErrors()) {
+ if (bkDistributedLogManager.getFailureInjector().shouldInjectErrors()) {
throw new IOException("Reader Simulated Exception");
}
LogRecordWithDLSN record;
@@ -609,15 +632,15 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
} else {
if (record.isEndOfStream() && !returnEndOfStreamRecord) {
setLastException(new EndOfStreamException("End of Stream Reached for "
- + bkLedgerManager.getFullyQualifiedName()));
+ + readHandler.getFullyQualifiedName()));
break;
}
// gap detection
if (recordPositionsContainsGap(record, lastPosition)) {
- bkDistributedLogManager.raiseAlert("Gap detected between records at dlsn = {}", record.getDlsn());
+ bkDistributedLogManager.raiseAlert("Gap detected between records at record = {}", record);
if (positionGapDetectionEnabled) {
- throw new DLIllegalStateException("Gap detected between records at dlsn = " + record.getDlsn());
+ throw new DLIllegalStateException("Gap detected between records at record = " + record);
}
}
lastPosition = record.getLastPositionWithinLogSegment();
@@ -628,7 +651,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
} catch (IOException exc) {
setLastException(exc);
if (!(exc instanceof LogNotFoundException)) {
- LOG.warn("{} : read with skip Exception", bkLedgerManager.getFullyQualifiedName(), lastException.get());
+ LOG.warn("{} : read with skip Exception", readHandler.getFullyQualifiedName(), lastException.get());
}
continue;
}
@@ -709,13 +732,12 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
@VisibleForTesting
void simulateErrors() {
- failureInjector.injectErrors(true);
+ bkDistributedLogManager.getFailureInjector().injectErrors(true);
}
@VisibleForTesting
synchronized void disableReadAheadLogSegmentsNotification() {
- disableReadAheadLogSegmentsNotification = true;
- bkLedgerManager.disableReadAheadLogSegmentsNotification();
+ readHandler.disableReadAheadLogSegmentsNotification();
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index cd3f359..4963787 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -34,6 +34,10 @@ import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.function.CloseAsyncCloseableFunction;
import com.twitter.distributedlog.function.GetVersionedValueFunction;
+import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
+import com.twitter.distributedlog.injector.AsyncRandomFailureInjector;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
import com.twitter.distributedlog.metadata.LogMetadataForReader;
import com.twitter.distributedlog.metadata.LogMetadataForWriter;
import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
@@ -141,7 +145,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
private final FeatureProvider featureProvider;
private final StatsLogger statsLogger;
private final StatsLogger perLogStatsLogger;
- private final AlertStatsLogger alertStatsLogger;
+ final AlertStatsLogger alertStatsLogger;
// log stream metadata stores
private final LogStreamMetadataStore writerMetadataStore;
@@ -159,6 +163,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
private final boolean ownWriterBKC;
private final BookKeeperClientBuilder readerBKCBuilder;
private final BookKeeperClient readerBKC;
+ private LogSegmentEntryStore readerEntryStore = null;
private final boolean ownReaderBKC;
//
@@ -176,6 +181,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
private final PendingReaders pendingReaders;
private final ReadAheadExceptionsLogger readAheadExceptionsLogger;
+ // Failure Injector
+ private final AsyncFailureInjector failureInjector;
+
/**
* Create a DLM for testing.
*
@@ -303,6 +311,16 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
this.ledgerAllocator = ledgerAllocator;
this.writeLimiter = writeLimiter;
+ // Failure Injection
+ this.failureInjector = AsyncRandomFailureInjector.newBuilder()
+ .injectDelays(conf.getEIInjectReadAheadDelay(),
+ conf.getEIInjectReadAheadDelayPercent(),
+ conf.getEIInjectMaxReadAheadDelayMs())
+ .injectErrors(false, 10)
+ .injectStops(conf.getEIInjectReadAheadStall(), 10)
+ .injectCorruption(conf.getEIInjectReadAheadBrokenEntries())
+ .build();
+
if (null == writerMetadataStore) {
this.writerMetadataStore = new ZKLogStreamMetadataStore(
clientId,
@@ -413,6 +431,18 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
return this.readerBKC;
}
+ synchronized LogSegmentEntryStore getReaderEntryStore() throws IOException {
+ if (null == readerEntryStore) {
+ readerEntryStore = new BKLogSegmentEntryStore(
+ conf,
+ readerBKC.get(),
+ scheduler,
+ statsLogger,
+ failureInjector);
+ }
+ return this.readerEntryStore;
+ }
+
@VisibleForTesting
FuturePool getReaderFuturePool() {
return this.readerFuturePool;
@@ -423,6 +453,10 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
return this.featureProvider;
}
+ AsyncFailureInjector getFailureInjector() {
+ return this.failureInjector;
+ }
+
private synchronized BKLogReadHandler getReadHandlerAndRegisterListener(
boolean create, LogSegmentListener listener) {
if (null == readHandlerForListener && create) {
@@ -432,7 +466,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
readHandlerForListener.asyncStartFetchLogSegments();
return readHandlerForListener;
}
- readHandlerForListener.registerListener(listener);
+ if (null != readHandlerForListener && null != listener) {
+ readHandlerForListener.registerListener(listener);
+ }
return readHandlerForListener;
}
@@ -493,13 +529,11 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
return createReadHandler(
subscriberId,
null,
- true, /* deserialize record set */
isHandleForReading);
}
synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId,
AsyncNotification notification,
- boolean deserializeRecordSet,
boolean isHandleForReading) {
LogMetadataForReader logMetadata = LogMetadataForReader.of(uri, name, streamIdentifier);
return new BKLogReadHandler(
@@ -511,15 +545,12 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
readerMetadataStore,
logSegmentMetadataCache,
scheduler,
- readAheadScheduler,
alertStatsLogger,
- readAheadExceptionsLogger,
statsLogger,
perLogStatsLogger,
clientId,
notification,
- isHandleForReading,
- deserializeRecordSet);
+ isHandleForReading);
}
// Create Ledger Allocator
@@ -930,7 +961,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
fromDLSN,
subscriberId,
false,
- dynConf.getDeserializeRecordSetOnReads(),
statsLogger);
pendingReaders.add(reader);
return Future.value(reader);
@@ -969,7 +999,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN,
subscriberId,
false,
- dynConf.getDeserializeRecordSetOnReads(),
statsLogger);
pendingReaders.add(reader);
final Future<Void> lockFuture = reader.lockStream();
@@ -1046,15 +1075,14 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
LogReader getInputStreamInternal(DLSN fromDLSN, Optional<Long> fromTxnId)
throws IOException {
- LOG.info("Create async reader starting from {}", fromDLSN);
+ LOG.info("Create sync reader starting from {}", fromDLSN);
checkClosedOrInError("getInputStream");
- LogReader reader = new BKSyncLogReaderDLSN(
+ return new BKSyncLogReaderDLSN(
conf,
this,
fromDLSN,
fromTxnId,
statsLogger);
- return reader;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
index 9cfe1a6..67c584c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
-import com.google.common.base.Ticker;
import com.twitter.distributedlog.callback.LogSegmentListener;
import com.twitter.distributedlog.callback.LogSegmentNamesListener;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
@@ -36,14 +35,10 @@ import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.metadata.LogMetadataForReader;
-import com.twitter.distributedlog.injector.AsyncFailureInjector;
import com.twitter.distributedlog.lock.DistributedLock;
import com.twitter.distributedlog.logsegment.LogSegmentFilter;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
-import com.twitter.distributedlog.readahead.ReadAheadWorker;
-import com.twitter.distributedlog.stats.BroadCastStatsLogger;
-import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.Utils;
@@ -111,13 +106,9 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
protected final LogMetadataForReader logMetadataForReader;
- protected final ReadAheadCache readAheadCache;
protected final LedgerHandleCache handleCache;
- protected final OrderedScheduler readAheadExecutor;
protected final DynamicDistributedLogConfiguration dynConf;
- protected ReadAheadWorker readAheadWorker = null;
- private final boolean isHandleForReading;
private final Optional<String> subscriberId;
private DistributedLock readLock;
@@ -134,10 +125,7 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
new Versioned<List<LogSegmentMetadata>>(null, Version.NEW);
// stats
- private final AlertStatsLogger alertStatsLogger;
- private final StatsLogger handlerStatsLogger;
private final StatsLogger perLogStatsLogger;
- private final ReadAheadExceptionsLogger readAheadExceptionsLogger;
/**
* Construct a Bookkeeper journal manager.
@@ -150,15 +138,12 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
LogStreamMetadataStore streamMetadataStore,
LogSegmentMetadataCache metadataCache,
OrderedScheduler scheduler,
- OrderedScheduler readAheadExecutor,
AlertStatsLogger alertStatsLogger,
- ReadAheadExceptionsLogger readAheadExceptionsLogger,
StatsLogger statsLogger,
StatsLogger perLogStatsLogger,
String clientId,
AsyncNotification readerStateNotification,
- boolean isHandleForReading,
- boolean deserializeRecordSet) {
+ boolean isHandleForReading) {
super(logMetadata,
conf,
bkcBuilder,
@@ -170,13 +155,8 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
clientId);
this.logMetadataForReader = logMetadata;
this.dynConf = dynConf;
- this.readAheadExecutor = readAheadExecutor;
- this.alertStatsLogger = alertStatsLogger;
this.perLogStatsLogger =
isHandleForReading ? perLogStatsLogger : NullStatsLogger.INSTANCE;
- this.handlerStatsLogger =
- BroadCastStatsLogger.masterslave(this.perLogStatsLogger, statsLogger);
- this.readAheadExceptionsLogger = readAheadExceptionsLogger;
this.readerStateNotification = readerStateNotification;
handleCache = LedgerHandleCache.newBuilder()
@@ -184,16 +164,7 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
.conf(conf)
.statsLogger(statsLogger)
.build();
- readAheadCache = new ReadAheadCache(
- getFullyQualifiedName(),
- alertStatsLogger,
- readerStateNotification,
- dynConf.getReadAheadMaxRecords(),
- deserializeRecordSet,
- Ticker.systemTicker());
-
this.subscriberId = subscriberId;
- this.isHandleForReading = isHandleForReading;
}
@VisibleForTesting
@@ -290,16 +261,10 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
}
lockToClose = readLock;
}
- return Utils.closeSequence(scheduler, readAheadWorker, lockToClose)
+ return Utils.closeSequence(scheduler, lockToClose)
.flatMap(new AbstractFunction1<Void, Future<Void>>() {
@Override
public Future<Void> apply(Void result) {
- if (null != readAheadCache) {
- readAheadCache.clear();
- }
- if (null != readAheadWorker) {
- unregisterListener(readAheadWorker);
- }
if (null != handleCache) {
handleCache.clear();
}
@@ -361,57 +326,6 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
});
}
- public void startReadAhead(LedgerReadPosition startPosition,
- AsyncFailureInjector failureInjector) {
- if (null == readAheadWorker) {
- readAheadWorker = new ReadAheadWorker(
- conf,
- dynConf,
- logMetadataForReader,
- this,
- readAheadExecutor,
- handleCache,
- startPosition,
- readAheadCache,
- isHandleForReading,
- readAheadExceptionsLogger,
- handlerStatsLogger,
- perLogStatsLogger,
- alertStatsLogger,
- failureInjector,
- readerStateNotification);
- registerListener(readAheadWorker);
- // start the readahead worker after the log segments are fetched
- asyncStartFetchLogSegments().map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
- readAheadWorker.start(logSegments.getValue());
- return BoxedUnit.UNIT;
- }
- });
- }
- }
-
- public boolean isReadAheadCaughtUp() {
- return null != readAheadWorker && readAheadWorker.isCaughtUp();
- }
-
- public LedgerHandleCache getHandleCache() {
- return handleCache;
- }
-
- public Entry.Reader getNextReadAheadEntry() throws IOException {
- return readAheadCache.getNextReadAheadEntry();
- }
-
- public Entry.Reader getNextReadAheadEntry(long waitTime, TimeUnit waitTimeUnit) throws IOException {
- return readAheadCache.getNextReadAheadEntry(waitTime, waitTimeUnit);
- }
-
- public ReadAheadCache getReadAheadCache() {
- return readAheadCache;
- }
-
@VisibleForTesting
void disableReadAheadLogSegmentsNotification() {
logSegmentsNotificationDisabled = true;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
index 0f6db75..adf49a1 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
@@ -19,14 +19,18 @@ package com.twitter.distributedlog;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.common.base.Ticker;
import com.twitter.distributedlog.exceptions.EndOfStreamException;
import com.twitter.distributedlog.exceptions.IdleReaderException;
-import com.twitter.distributedlog.injector.AsyncFailureInjector;
import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.Utils;
import com.twitter.util.Future;
import com.twitter.util.Promise;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
import java.io.IOException;
import java.util.LinkedList;
@@ -39,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
*/
class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
+ private final BKDistributedLogManager bkdlm;
private final BKLogReadHandler readHandler;
private final AtomicReference<IOException> readerException =
new AtomicReference<IOException>(null);
@@ -48,6 +53,9 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
private boolean positioned = false;
private Entry.Reader currentEntry = null;
+ // readahead reader
+ ReadAheadEntryReader readAheadReader = null;
+
// idle reader settings
private final boolean shouldCheckIdleReader;
private final int idleErrorThresholdMillis;
@@ -59,19 +67,19 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
BKDistributedLogManager bkdlm,
DLSN startDLSN,
Optional<Long> startTransactionId,
- StatsLogger statsLogger) {
+ StatsLogger statsLogger) throws IOException {
+ this.bkdlm = bkdlm;
this.readHandler = bkdlm.createReadHandler(
Optional.<String>absent(),
this,
- conf.getDeserializeRecordSetOnReads(),
true);
this.maxReadAheadWaitTime = conf.getReadAheadWaitTime();
this.idleErrorThresholdMillis = conf.getReaderIdleErrorThresholdMillis();
this.shouldCheckIdleReader = idleErrorThresholdMillis > 0 && idleErrorThresholdMillis < Integer.MAX_VALUE;
this.startTransactionId = startTransactionId;
- readHandler.startReadAhead(
- new LedgerReadPosition(startDLSN),
- AsyncFailureInjector.NULL);
+
+ // start readahead
+ startReadAhead(startDLSN);
if (!startTransactionId.isPresent()) {
positioned = true;
}
@@ -81,32 +89,55 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
idleReaderError = syncReaderStatsLogger.getCounter("idle_reader_error");
}
+ private void startReadAhead(DLSN startDLSN) throws IOException {
+ readAheadReader = new ReadAheadEntryReader(
+ bkdlm.getStreamName(),
+ startDLSN,
+ bkdlm.getConf(),
+ readHandler,
+ bkdlm.getReaderEntryStore(),
+ bkdlm.getScheduler(),
+ Ticker.systemTicker(),
+ bkdlm.alertStatsLogger);
+ readHandler.registerListener(readAheadReader);
+ readHandler.asyncStartFetchLogSegments()
+ .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
+ readAheadReader.addStateChangeNotification(BKSyncLogReaderDLSN.this);
+ readAheadReader.start(logSegments.getValue());
+ return BoxedUnit.UNIT;
+ }
+ });
+ }
+
+ @VisibleForTesting
+ ReadAheadEntryReader getReadAheadReader() {
+ return readAheadReader;
+ }
+
@VisibleForTesting
BKLogReadHandler getReadHandler() {
return readHandler;
}
- // reader is still catching up, waiting for next record
-
private Entry.Reader readNextEntry(boolean nonBlocking) throws IOException {
Entry.Reader entry = null;
if (nonBlocking) {
- return readHandler.getNextReadAheadEntry();
+ return readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
} else {
- while (!readHandler.isReadAheadCaughtUp()
+ while (!readAheadReader.isReadAheadCaughtUp()
&& null == readerException.get()
&& null == entry) {
- entry = readHandler.getNextReadAheadEntry(maxReadAheadWaitTime,
- TimeUnit.MILLISECONDS);
+ entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
}
if (null != entry) {
return entry;
}
// reader is caught up
- if (readHandler.isReadAheadCaughtUp()
+ if (readAheadReader.isReadAheadCaughtUp()
&& null == readerException.get()) {
- entry = readHandler.getNextReadAheadEntry(maxReadAheadWaitTime,
- TimeUnit.MILLISECONDS);
+ entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
}
return entry;
}
@@ -121,30 +152,24 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
throw ire;
}
-
@Override
public synchronized LogRecordWithDLSN readNext(boolean nonBlocking)
throws IOException {
if (null != readerException.get()) {
throw readerException.get();
}
-
LogRecordWithDLSN record = doReadNext(nonBlocking);
-
// no record is returned, check if the reader becomes idle
if (null == record && shouldCheckIdleReader) {
- ReadAheadCache cache = readHandler.getReadAheadCache();
- if (cache.getNumCachedEntries() <= 0 &&
- cache.isReadAheadIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) {
+ if (readAheadReader.getNumCachedEntries() <= 0 &&
+ readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) {
markReaderAsIdle();
}
}
-
return record;
}
- private synchronized LogRecordWithDLSN doReadNext(boolean nonBlocking)
- throws IOException {
+ private LogRecordWithDLSN doReadNext(boolean nonBlocking) throws IOException {
LogRecordWithDLSN record = null;
do {
@@ -217,7 +242,12 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
}
closeFuture = closePromise = new Promise<Void>();
}
- readHandler.asyncClose().proxyTo(closePromise);
+ readHandler.unregisterListener(readAheadReader);
+ readAheadReader.removeStateChangeNotification(this);
+ Utils.closeSequence(bkdlm.getScheduler(), true,
+ readAheadReader,
+ readHandler
+ ).proxyTo(closePromise);
return closePromise;
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java
index b1bd701..bf315fc 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java
@@ -358,6 +358,20 @@ public class Entry {
public interface Reader {
/**
+ * Get the log segment sequence number.
+ *
+ * @return the log segment sequence number.
+ */
+ long getLSSN();
+
+ /**
+ * Return the entry id.
+ *
+ * @return the entry id.
+ */
+ long getEntryId();
+
+ /**
* Read next log record from this record set.
*
* @return next log record from this record set.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java
new file mode 100644
index 0000000..0a15d29
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog;
+
+/**
+ * The position of an entry, identified by log segment sequence number and entry id.
+ */
+class EntryPosition {
+
+ private long lssn;
+ private long entryId;
+
+ EntryPosition(long lssn, long entryId) {
+ this.lssn = lssn;
+ this.entryId = entryId;
+ }
+
+ public synchronized long getLogSegmentSequenceNumber() {
+ return lssn;
+ }
+
+ public synchronized long getEntryId() {
+ return entryId;
+ }
+
+ public synchronized boolean advance(long lssn, long entryId) {
+ if (lssn == this.lssn) {
+ if (entryId <= this.entryId) {
+ return false;
+ }
+ this.entryId = entryId;
+ return true;
+ } else if (lssn > this.lssn) {
+ this.lssn = lssn;
+ this.entryId = entryId;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("(").append(lssn).append(", ").append(entryId).append(")");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java
index 79e4408..038bb18 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java
@@ -57,6 +57,16 @@ class EnvelopedEntryReader implements Entry.Reader, RecordStream {
}
@Override
+ public long getLSSN() {
+ return logSegmentSeqNo;
+ }
+
+ @Override
+ public long getEntryId() {
+ return entryId;
+ }
+
+ @Override
public LogRecordWithDLSN nextRecord() throws IOException {
return reader.readOp();
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
index ed7218e..c3948df 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
@@ -66,7 +66,7 @@ class MaxTxId {
}
public Versioned<Long> getVersionedData(long txId) {
- return new Versioned<Long>(Math.max(txId, currentMax), version);
+ return new Versioned<Long>(Math.max(txId, get()), version);
}
}