You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:34 UTC
[20/31] incubator-distributedlog git commit: DL-162: Use log segment
entry store interface
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
deleted file mode 100644
index 8529281..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
+++ /dev/null
@@ -1,1503 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.readahead;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.exceptions.AlreadyTruncatedTransactionException;
-import com.twitter.distributedlog.AsyncNotification;
-import com.twitter.distributedlog.BKLogHandler;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.LedgerDescriptor;
-import com.twitter.distributedlog.LedgerHandleCache;
-import com.twitter.distributedlog.LedgerReadPosition;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.LogReadException;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.ReadAheadCache;
-import com.twitter.distributedlog.callback.LogSegmentListener;
-import com.twitter.distributedlog.callback.ReadAheadCallback;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.metadata.LogMetadataForReader;
-import com.twitter.distributedlog.injector.AsyncFailureInjector;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.distributedlog.logsegment.LogSegmentFilter;
-import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.AsyncCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
-import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Function1;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.util.Enumeration;
-import java.util.List;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * ReadAhead Worker process readahead in asynchronous way. The whole readahead process are chained into
- * different phases:
- *
- * <p>
- * ScheduleReadAheadPhase: Schedule the readahead request based on previous state (e.g. whether to backoff).
- * After the readahead request was scheduled, the worker enters ReadAhead phase.
- * </p>
- * <p>
- * ReadAhead Phase: This phase is divided into several sub-phases. All the sub-phases are chained into the
- * execution flow. If errors happened during execution, the worker enters Exceptions Handling Phase.
- * <br>
- * CheckInProgressChangedPhase: check whether there is in progress ledgers changed and updated the metadata.
- * <br>
- * OpenLedgerPhase: open ledgers if necessary for following read requests.
- * <br>
- * ReadEntriesPhase: read entries from bookkeeper and fill the readahead cache.
- * <br>
- * After that, the worker goes back to Schedule Phase to schedule next readahead request.
- * </p>
- * <p>
- * Exceptions Handling Phase: Handle all the exceptions and properly schedule next readahead request.
- * </p>
- */
-public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncCloseable, LogSegmentListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(ReadAheadWorker.class);
-
- private static final int BKC_ZK_EXCEPTION_THRESHOLD_IN_SECONDS = 30;
- private static final int BKC_UNEXPECTED_EXCEPTION_THRESHOLD = 3;
-
- // Stream information
- private final String fullyQualifiedName;
- private final DistributedLogConfiguration conf;
- private final DynamicDistributedLogConfiguration dynConf;
- private final LogMetadataForReader logMetadata;
- private final BKLogHandler bkLedgerManager;
- private final boolean isHandleForReading;
- // Notification to notify readahead status
- protected final AsyncNotification notification;
-
- // resources
- protected final OrderedScheduler scheduler;
- private final LedgerHandleCache handleCache;
- private final ReadAheadCache readAheadCache;
-
- // ReadAhead Status
- volatile boolean running = true;
- Promise<Void> stopPromise = null;
- private volatile boolean isCatchingUp = true;
- private volatile boolean logDeleted = false;
- private volatile boolean readAheadError = false;
- private volatile boolean readAheadInterrupted = false;
- private volatile boolean readingFromTruncated = false;
-
- // Exceptions Handling
- volatile boolean encounteredException = false;
- private final AtomicInteger bkcZkExceptions = new AtomicInteger(0);
- private final AtomicInteger bkcUnExpectedExceptions = new AtomicInteger(0);
- private final int noLedgerExceptionOnReadLACThreshold;
- private final AtomicInteger bkcNoLedgerExceptionsOnReadLAC = new AtomicInteger(0);
-
- // Read Ahead Positions
- private final LedgerReadPosition startReadPosition;
- protected LedgerReadPosition nextReadAheadPosition;
-
- //
- // LogSegments & Metadata Notification
- //
-
- // variables related to zookeeper watcher notification to interrupt long poll waits
- final Object notificationLock = new Object();
- AsyncNotification metadataNotification = null;
- volatile long metadataNotificationTimeMillis = -1L;
-
- // variables related to log segments
- private volatile boolean reInitializeMetadata = true;
- private volatile boolean forceReadLogSegments = false;
- volatile boolean inProgressChanged = false;
- private LogSegmentMetadata currentMetadata = null;
- private int currentMetadataIndex;
- protected LedgerDescriptor currentLH;
- private volatile List<LogSegmentMetadata> logSegmentListNotified;
- private volatile List<LogSegmentMetadata> logSegmentList;
-
- //
- // ReadAhead Phases
- //
-
- final Phase schedulePhase = new ScheduleReadAheadPhase();
- final Phase exceptionHandler = new ExceptionHandlePhase(schedulePhase);
- final Phase readAheadPhase =
- new StoppablePhase(
- new CheckInProgressChangedPhase(
- new OpenLedgerPhase(
- new ReadEntriesPhase(schedulePhase))));
-
- //
- // Stats, Tracing and Failure Injection
- //
-
- // failure injector
- private final AsyncFailureInjector failureInjector;
- // trace
- protected final long metadataLatencyWarnThresholdMillis;
- final ReadAheadTracker tracker;
- final Stopwatch resumeStopWatch;
- final Stopwatch LACNotAdvancedStopWatch = Stopwatch.createUnstarted();
- // Misc
- private final boolean readAheadSkipBrokenEntries;
- // Stats
- private final AlertStatsLogger alertStatsLogger;
- private final StatsLogger readAheadPerStreamStatsLogger;
- private final Counter readAheadWorkerWaits;
- private final Counter readAheadEntryPiggyBackHits;
- private final Counter readAheadEntryPiggyBackMisses;
- private final Counter readAheadReadLACAndEntryCounter;
- private final Counter readAheadCacheFullCounter;
- private final Counter readAheadSkippedBrokenEntries;
- private final Counter idleReaderWarn;
- private final OpStatsLogger readAheadReadEntriesStat;
- private final OpStatsLogger readAheadCacheResumeStat;
- private final OpStatsLogger readAheadLacLagStats;
- private final OpStatsLogger longPollInterruptionStat;
- private final OpStatsLogger metadataReinitializationStat;
- private final OpStatsLogger notificationExecutionStat;
- private final ReadAheadExceptionsLogger readAheadExceptionsLogger;
-
- public ReadAheadWorker(DistributedLogConfiguration conf,
- DynamicDistributedLogConfiguration dynConf,
- LogMetadataForReader logMetadata,
- BKLogHandler ledgerManager,
- OrderedScheduler scheduler,
- LedgerHandleCache handleCache,
- LedgerReadPosition startPosition,
- ReadAheadCache readAheadCache,
- boolean isHandleForReading,
- ReadAheadExceptionsLogger readAheadExceptionsLogger,
- StatsLogger handlerStatsLogger,
- StatsLogger readAheadPerStreamStatsLogger,
- AlertStatsLogger alertStatsLogger,
- AsyncFailureInjector failureInjector,
- AsyncNotification notification) {
- // Log information
- this.fullyQualifiedName = logMetadata.getFullyQualifiedName();
- this.conf = conf;
- this.dynConf = dynConf;
- this.logMetadata = logMetadata;
- this.bkLedgerManager = ledgerManager;
- this.isHandleForReading = isHandleForReading;
- this.notification = notification;
- // Resources
- this.scheduler = scheduler;
- this.handleCache = handleCache;
- this.readAheadCache = readAheadCache;
- // Readahead status
- this.startReadPosition = new LedgerReadPosition(startPosition);
- this.nextReadAheadPosition = new LedgerReadPosition(startPosition);
- // LogSegments
-
- // Failure Detection
- this.failureInjector = failureInjector;
- // Tracing
- this.metadataLatencyWarnThresholdMillis = conf.getMetadataLatencyWarnThresholdMillis();
- this.noLedgerExceptionOnReadLACThreshold =
- conf.getReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis() / conf.getReadAheadWaitTime();
- this.tracker = new ReadAheadTracker(logMetadata.getLogName(), readAheadCache,
- ReadAheadPhase.SCHEDULE_READAHEAD, readAheadPerStreamStatsLogger);
- this.resumeStopWatch = Stopwatch.createUnstarted();
- // Misc
- this.readAheadSkipBrokenEntries = conf.getReadAheadSkipBrokenEntries();
- // Stats
- this.alertStatsLogger = alertStatsLogger;
- this.readAheadPerStreamStatsLogger = readAheadPerStreamStatsLogger;
- StatsLogger readAheadStatsLogger = handlerStatsLogger.scope("readahead_worker");
- readAheadWorkerWaits = readAheadStatsLogger.getCounter("wait");
- readAheadEntryPiggyBackHits = readAheadStatsLogger.getCounter("entry_piggy_back_hits");
- readAheadEntryPiggyBackMisses = readAheadStatsLogger.getCounter("entry_piggy_back_misses");
- readAheadReadEntriesStat = readAheadStatsLogger.getOpStatsLogger("read_entries");
- readAheadReadLACAndEntryCounter = readAheadStatsLogger.getCounter("read_lac_and_entry_counter");
- readAheadCacheFullCounter = readAheadStatsLogger.getCounter("cache_full");
- readAheadSkippedBrokenEntries = readAheadStatsLogger.getCounter("skipped_broken_entries");
- readAheadCacheResumeStat = readAheadStatsLogger.getOpStatsLogger("resume");
- readAheadLacLagStats = readAheadStatsLogger.getOpStatsLogger("read_lac_lag");
- longPollInterruptionStat = readAheadStatsLogger.getOpStatsLogger("long_poll_interruption");
- notificationExecutionStat = readAheadStatsLogger.getOpStatsLogger("notification_execution");
- metadataReinitializationStat = readAheadStatsLogger.getOpStatsLogger("metadata_reinitialization");
- idleReaderWarn = readAheadStatsLogger.getCounter("idle_reader_warn");
- this.readAheadExceptionsLogger = readAheadExceptionsLogger;
- }
-
- @VisibleForTesting
- public LedgerReadPosition getNextReadAheadPosition() {
- return nextReadAheadPosition;
- }
-
- public LedgerDescriptor getCurrentLedgerDescriptor() {
- return currentLH;
- }
-
- //
- // ReadAhead Status
- //
-
- void setReadAheadError(ReadAheadTracker tracker, Throwable cause) {
- LOG.error("Read Ahead for {} is set to error.", logMetadata.getFullyQualifiedName());
- readAheadError = true;
- tracker.enterPhase(ReadAheadPhase.ERROR);
- if (null != notification) {
- notification.notifyOnError(cause);
- }
- if (null != stopPromise) {
- FutureUtils.setValue(stopPromise, null);
- }
- }
-
- void setReadAheadInterrupted(ReadAheadTracker tracker) {
- readAheadInterrupted = true;
- tracker.enterPhase(ReadAheadPhase.INTERRUPTED);
- if (null != notification) {
- notification.notifyOnError(new DLInterruptedException("ReadAhead worker for "
- + bkLedgerManager.getFullyQualifiedName() + " is interrupted."));
- }
- if (null != stopPromise) {
- FutureUtils.setValue(stopPromise, null);
- }
- }
-
- void setReadingFromTruncated(ReadAheadTracker tracker) {
- readingFromTruncated = true;
- tracker.enterPhase(ReadAheadPhase.TRUNCATED);
- if (null != notification) {
- notification.notifyOnError(
- new AlreadyTruncatedTransactionException(logMetadata.getFullyQualifiedName()
- + ": Trying to position read ahead to a segment that is marked truncated"));
- }
- if (null != stopPromise) {
- FutureUtils.setValue(stopPromise, null);
- }
- }
-
- private void setReadAheadStopped() {
- tracker.enterPhase(ReadAheadPhase.STOPPED);
- if (null != stopPromise) {
- FutureUtils.setValue(stopPromise, null);
- }
- LOG.info("Stopped ReadAheadWorker for {}", fullyQualifiedName);
- }
-
- public void checkClosedOrInError()
- throws LogNotFoundException, LogReadException, DLInterruptedException,
- AlreadyTruncatedTransactionException {
- if (logDeleted) {
- throw new LogNotFoundException(logMetadata.getFullyQualifiedName() + " is already deleted.");
- } else if (readingFromTruncated) {
- throw new AlreadyTruncatedTransactionException(
- String.format("%s: Trying to position read ahead a segment that is marked truncated",
- logMetadata.getFullyQualifiedName()));
- } else if (readAheadInterrupted) {
- throw new DLInterruptedException(String.format("%s: ReadAhead Thread was interrupted",
- logMetadata.getFullyQualifiedName()));
- } else if (readAheadError) {
- throw new LogReadException(String.format("%s: ReadAhead Thread encountered exceptions",
- logMetadata.getFullyQualifiedName()));
- }
- }
-
- public boolean isCaughtUp() {
- return !isCatchingUp;
- }
-
- public void start(List<LogSegmentMetadata> segmentList) {
- LOG.debug("Starting ReadAhead Worker for {} : segments = {}",
- fullyQualifiedName, segmentList);
- running = true;
- logSegmentListNotified = segmentList;
- schedulePhase.process(BKException.Code.OK);
- }
-
- @Override
- public Future<Void> asyncClose() {
- LOG.info("Stopping Readahead worker for {}", fullyQualifiedName);
- running = false;
- // Unregister associated gauages to prevent GC spiral
- this.tracker.unregisterGauge();
-
- // Aside from unfortunate naming of variables, this allows
- // the currently active long poll to be interrupted and completed
- AsyncNotification notification;
- synchronized (notificationLock) {
- notification = metadataNotification;
- metadataNotification = null;
- }
- if (null != notification) {
- notification.notifyOnOperationComplete();
- }
- if (null == stopPromise) {
- return Future.Void();
- }
- return FutureUtils.ignore(FutureUtils.within(
- stopPromise,
- 2 * conf.getReadAheadWaitTime(),
- TimeUnit.MILLISECONDS,
- new TimeoutException("Timeout on waiting for ReadAhead worker to stop " + fullyQualifiedName),
- scheduler,
- fullyQualifiedName));
- }
-
- @Override
- public String toString() {
- return "Running:" + running +
- ", NextReadAheadPosition:" + nextReadAheadPosition +
- ", BKZKExceptions:" + bkcZkExceptions.get() +
- ", BKUnexpectedExceptions:" + bkcUnExpectedExceptions.get() +
- ", EncounteredException:" + encounteredException +
- ", readAheadError:" + readAheadError +
- ", readAheadInterrupted" + readAheadInterrupted +
- ", CurrentMetadata:" + ((null != currentMetadata) ? currentMetadata : "NONE") +
- ", FailureInjector:" + failureInjector;
- }
-
- @Override
- public void resumeReadAhead() {
- try {
- long cacheResumeLatency = resumeStopWatch.stop().elapsed(TimeUnit.MICROSECONDS);
- readAheadCacheResumeStat.registerSuccessfulEvent(cacheResumeLatency);
- } catch (IllegalStateException ise) {
- LOG.error("Encountered illegal state when stopping resume stop watch for {} : ",
- logMetadata.getFullyQualifiedName(), ise);
- }
- submit(this);
- }
-
- Runnable addRTEHandler(final Runnable runnable) {
- return new Runnable() {
- @Override
- public void run() {
- try {
- runnable.run();
- } catch (RuntimeException rte) {
- LOG.error("ReadAhead on stream {} encountered runtime exception",
- logMetadata.getFullyQualifiedName(), rte);
- setReadAheadError(tracker, rte);
- throw rte;
- }
- }
- };
- }
-
- <T> Function1<T, BoxedUnit> submit(final Function1<T, BoxedUnit> function) {
- return new AbstractFunction1<T, BoxedUnit>() {
- @Override
- public BoxedUnit apply(final T input) {
- submit(new Runnable() {
- @Override
- public void run() {
- function.apply(input);
- }
- });
- return BoxedUnit.UNIT;
- }
- };
- }
-
- void submit(Runnable runnable) {
- if (failureInjector.shouldInjectStops()) {
- LOG.warn("Error injected: read ahead for stream {} is going to stall.",
- logMetadata.getFullyQualifiedName());
- return;
- }
-
- if (failureInjector.shouldInjectDelays()) {
- int delayMs = failureInjector.getInjectedDelayMs();
- schedule(runnable, delayMs);
- return;
- }
-
- try {
- scheduler.submit(addRTEHandler(runnable));
- } catch (RejectedExecutionException ree) {
- setReadAheadError(tracker, ree);
- }
- }
-
- private void schedule(Runnable runnable, long timeInMillis) {
- try {
- InterruptibleScheduledRunnable task = new InterruptibleScheduledRunnable(runnable);
- boolean executeImmediately = setMetadataNotification(task);
- if (executeImmediately) {
- scheduler.submit(addRTEHandler(task));
- return;
- }
- scheduler.schedule(addRTEHandler(task), timeInMillis, TimeUnit.MILLISECONDS);
- readAheadWorkerWaits.inc();
- } catch (RejectedExecutionException ree) {
- setReadAheadError(tracker, ree);
- }
- }
-
- private void handleException(ReadAheadPhase phase, int returnCode) {
- readAheadExceptionsLogger.getBKExceptionStatsLogger(phase.name()).getExceptionCounter(returnCode).inc();
- exceptionHandler.process(returnCode);
- }
-
- private boolean closeCurrentLedgerHandle() {
- if (currentLH == null) {
- return true;
- }
- boolean retVal = false;
- LedgerDescriptor ld = currentLH;
- try {
- handleCache.closeLedger(ld);
- currentLH = null;
- retVal = true;
- } catch (BKException bke) {
- LOG.debug("BK Exception during closing {} : ", ld, bke);
- handleException(ReadAheadPhase.CLOSE_LEDGER, bke.getCode());
- }
-
- return retVal;
- }
-
- abstract class Phase {
-
- final Phase next;
-
- Phase(Phase next) {
- this.next = next;
- }
-
- abstract void process(int rc);
- }
-
- /**
- * Schedule next readahead request. If we need to backoff, schedule in a backoff delay.
- */
- final class ScheduleReadAheadPhase extends Phase {
-
- ScheduleReadAheadPhase() {
- super(null);
- }
-
- @Override
- void process(int rc) {
- if (!running) {
- setReadAheadStopped();
- return;
- }
- tracker.enterPhase(ReadAheadPhase.SCHEDULE_READAHEAD);
-
- boolean injectErrors = failureInjector.shouldInjectErrors();
- if (encounteredException || injectErrors) {
- int zkErrorThreshold = BKC_ZK_EXCEPTION_THRESHOLD_IN_SECONDS * 1000 * 4 / conf.getReadAheadWaitTime();
-
- if ((bkcZkExceptions.get() > zkErrorThreshold) || injectErrors) {
- LOG.error("{} : BookKeeper Client used by the ReadAhead Thread has encountered {} zookeeper exceptions : simulate = {}",
- new Object[] { fullyQualifiedName, bkcZkExceptions.get(), injectErrors });
- running = false;
- setReadAheadError(tracker, new LogReadException(
- "Encountered too many zookeeper issues on read ahead for " + bkLedgerManager.getFullyQualifiedName()));
- } else if (bkcUnExpectedExceptions.get() > BKC_UNEXPECTED_EXCEPTION_THRESHOLD) {
- LOG.error("{} : ReadAhead Thread has encountered {} unexpected BK exceptions.",
- fullyQualifiedName, bkcUnExpectedExceptions.get());
- running = false;
- setReadAheadError(tracker, new LogReadException(
- "Encountered too many unexpected bookkeeper issues on read ahead for " + bkLedgerManager.getFullyQualifiedName()));
- } else {
- // We must always reinitialize metadata if the last attempt to read failed.
- reInitializeMetadata = true;
- encounteredException = false;
- // Backoff before resuming
- if (LOG.isTraceEnabled()) {
- LOG.trace("Scheduling read ahead for {} after {} ms.", fullyQualifiedName, conf.getReadAheadWaitTime() / 4);
- }
- schedule(ReadAheadWorker.this, conf.getReadAheadWaitTime() / 4);
- }
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Scheduling read ahead for {} now.", fullyQualifiedName);
- }
- submit(ReadAheadWorker.this);
- }
- }
-
- }
-
- /**
- * Phase on handling exceptions.
- */
- final class ExceptionHandlePhase extends Phase {
-
- ExceptionHandlePhase(Phase next) {
- super(next);
- }
-
- @Override
- void process(int rc) {
- tracker.enterPhase(ReadAheadPhase.EXCEPTION_HANDLING);
-
- if (BKException.Code.InterruptedException == rc) {
- LOG.trace("ReadAhead Worker for {} is interrupted.", fullyQualifiedName);
- running = false;
- setReadAheadInterrupted(tracker);
- return;
- } else if (BKException.Code.ZKException == rc) {
- encounteredException = true;
- int numExceptions = bkcZkExceptions.incrementAndGet();
- LOG.debug("ReadAhead Worker for {} encountered zookeeper exception : total exceptions are {}.",
- fullyQualifiedName, numExceptions);
- } else if (BKException.Code.OK != rc) {
- encounteredException = true;
- switch(rc) {
- case BKException.Code.NoSuchEntryException:
- case BKException.Code.LedgerRecoveryException:
- case BKException.Code.NoSuchLedgerExistsException:
- break;
- default:
- bkcUnExpectedExceptions.incrementAndGet();
- }
- LOG.info("ReadAhead Worker for {} encountered exception : {}",
- fullyQualifiedName, BKException.create(rc));
- }
- // schedule next read ahead
- next.process(BKException.Code.OK);
- }
- }
-
- /**
- * A phase that could be stopped by a stopPromise
- */
- final class StoppablePhase extends Phase {
-
- StoppablePhase(Phase next) {
- super(next);
- }
-
- @Override
- void process(int rc) {
- if (!running) {
- setReadAheadStopped();
- return;
- }
-
- if (null == stopPromise) {
- stopPromise = new Promise<Void>();
- }
-
- // proceed the readahead request
- next.process(BKException.Code.OK);
- }
- }
-
- /**
- * Phase on checking in progress changed.
- */
- final class CheckInProgressChangedPhase extends Phase
- implements FutureEventListener<Versioned<List<LogSegmentMetadata>>> {
-
- CheckInProgressChangedPhase(Phase next) {
- super(next);
- }
-
- void processLogSegments(final List<LogSegmentMetadata> segments) {
- // submit callback execution to dlg executor to avoid deadlock.
- submit(new Runnable() {
- @Override
- public void run() {
- logSegmentList = segments;
- boolean isInitialPositioning = nextReadAheadPosition.definitelyLessThanOrEqualTo(startReadPosition);
- for (int i = 0; i < logSegmentList.size(); i++) {
- LogSegmentMetadata l = logSegmentList.get(i);
- // By default we should skip truncated segments during initial positioning
- if (l.isTruncated() &&
- isInitialPositioning &&
- !conf.getIgnoreTruncationStatus()) {
- continue;
- }
-
- DLSN nextReadDLSN = new DLSN(nextReadAheadPosition.getLogSegmentSequenceNumber(),
- nextReadAheadPosition.getEntryId(), -1);
-
- // next read position still inside a log segment
- final boolean hasDataToRead = (l.getLastDLSN().compareTo(nextReadDLSN) >= 0);
-
- // either there is data to read in current log segment or we are moving over a log segment that is
- // still inprogress or was inprogress, we have check (or maybe close) this log segment.
- final boolean checkOrCloseLedger = hasDataToRead ||
- // next read position move over a log segment, if l is still inprogress or it was inprogress
- ((l.isInProgress() || (null != currentMetadata && currentMetadata.isInProgress())) &&
- l.getLogSegmentSequenceNumber() == nextReadAheadPosition.getLogSegmentSequenceNumber());
-
- // If we are positioning on a partially truncated log segment then the truncation point should
- // be before the nextReadPosition
- if (l.isPartiallyTruncated() &&
- !isInitialPositioning &&
- (l.getMinActiveDLSN().compareTo(nextReadDLSN) > 0)) {
- if (conf.getAlertWhenPositioningOnTruncated()) {
- alertStatsLogger.raise("Trying to position reader on {} when {} is marked partially truncated",
- nextReadAheadPosition, l);
- }
-
- if (!conf.getIgnoreTruncationStatus()) {
- LOG.error("{}: Trying to position reader on {} when {} is marked partially truncated",
- new Object[]{ logMetadata.getFullyQualifiedName(), nextReadAheadPosition, l});
- setReadingFromTruncated(tracker);
- return;
- }
- }
-
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("CheckLogSegment : newMetadata = {}, currentMetadata = {}, nextReadAheadPosition = {}",
- new Object[] { l, currentMetadata, nextReadAheadPosition});
- }
-
- if (checkOrCloseLedger) {
- long startBKEntry = 0;
- if (l.isPartiallyTruncated() && !conf.getIgnoreTruncationStatus()) {
- startBKEntry = l.getMinActiveDLSN().getEntryId();
- }
-
- if(l.getLogSegmentSequenceNumber() == nextReadAheadPosition.getLogSegmentSequenceNumber()) {
- startBKEntry = Math.max(startBKEntry, nextReadAheadPosition.getEntryId());
- if (currentMetadata != null) {
- inProgressChanged = currentMetadata.isInProgress() && !l.isInProgress();
- }
- } else {
- // We are positioning on a new ledger => reset the current ledger handle
- LOG.trace("Positioning {} on a new ledger {}", fullyQualifiedName, l);
-
- if (!closeCurrentLedgerHandle()) {
- return;
- }
- }
-
- nextReadAheadPosition = new LedgerReadPosition(l.getLogSegmentId(), l.getLogSegmentSequenceNumber(), startBKEntry);
- if (conf.getTraceReadAheadMetadataChanges()) {
- LOG.info("Moved read position to {} for stream {} at {}.",
- new Object[] {nextReadAheadPosition, logMetadata.getFullyQualifiedName(), System.currentTimeMillis() });
- }
-
- if (l.isTruncated()) {
- if (conf.getAlertWhenPositioningOnTruncated()) {
- alertStatsLogger.raise("Trying to position reader on {} when {} is marked truncated",
- nextReadAheadPosition, l);
- }
-
- if (!conf.getIgnoreTruncationStatus()) {
- LOG.error("{}: Trying to position reader on {} when {} is marked truncated",
- new Object[]{ logMetadata.getFullyQualifiedName(), nextReadAheadPosition, l});
- setReadingFromTruncated(tracker);
- return;
- }
- }
-
- currentMetadata = l;
- currentMetadataIndex = i;
- break;
- }
-
- // Handle multiple in progress => stop at the first in progress
- if (l.isInProgress()) {
- break;
- }
- }
-
- if (null == currentMetadata) {
- if (isCatchingUp) {
- isCatchingUp = false;
- if (isHandleForReading) {
- LOG.info("{} caught up at {}: position = {} and no log segment to position on at this point.",
- new Object[] { fullyQualifiedName, System.currentTimeMillis(), nextReadAheadPosition });
- }
- }
- schedule(ReadAheadWorker.this, conf.getReadAheadWaitTimeOnEndOfStream());
- if (LOG.isDebugEnabled()) {
- LOG.debug("No log segment to position on for {}. Backing off for {} millseconds",
- fullyQualifiedName, conf.getReadAheadWaitTimeOnEndOfStream());
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Initialized metadata for {}, starting reading ahead from {} : {}.",
- new Object[] { fullyQualifiedName, currentMetadataIndex, currentMetadata });
- }
- next.process(BKException.Code.OK);
- }
-
- // Once we have read the ledger list for the first time, subsequent segments
- // should be read in a streaming manner and we should get the new ledgers as
- // they close in ZK through watchers.
- // So lets start observing the latency
- bkLedgerManager.reportGetSegmentStats(true);
- }
- });
- }
-
- @Override
- void process(int rc) {
- if (!running) {
- setReadAheadStopped();
- return;
- }
-
- tracker.enterPhase(ReadAheadPhase.GET_LEDGERS);
-
- inProgressChanged = false;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Checking {} if InProgress changed.", fullyQualifiedName);
- }
-
- if (reInitializeMetadata || null == currentMetadata) {
- reInitializeMetadata = false;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Reinitializing metadata for {}.", fullyQualifiedName);
- }
- if (metadataNotificationTimeMillis > 0) {
- long metadataReinitializeTimeMillis = System.currentTimeMillis();
- long elapsedMillisSinceMetadataChanged = metadataReinitializeTimeMillis - metadataNotificationTimeMillis;
- if (elapsedMillisSinceMetadataChanged >= metadataLatencyWarnThresholdMillis) {
- LOG.warn("{} reinitialize metadata at {}, which is {} millis after receiving notification at {}.",
- new Object[] { logMetadata.getFullyQualifiedName(), metadataReinitializeTimeMillis,
- elapsedMillisSinceMetadataChanged, metadataNotificationTimeMillis});
- }
- metadataReinitializationStat.registerSuccessfulEvent(
- TimeUnit.MILLISECONDS.toMicros(elapsedMillisSinceMetadataChanged));
- metadataNotificationTimeMillis = -1L;
- }
- if (forceReadLogSegments) {
- forceReadLogSegments = false;
- bkLedgerManager.readLogSegmentsFromStore(
- LogSegmentMetadata.COMPARATOR,
- LogSegmentFilter.DEFAULT_FILTER,
- null
- ).addEventListener(this);
- } else {
- processLogSegments(logSegmentListNotified);
- }
- } else {
- next.process(BKException.Code.OK);
- }
- }
-
- @Override
- public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) {
- processLogSegments(segments.getValue());
- }
-
- @Override
- public void onFailure(Throwable cause) {
- LOG.info("Encountered metadata exception while reading log segments of {} : {}. Retrying ...",
- bkLedgerManager.getFullyQualifiedName(), cause.getMessage());
- reInitializeMetadata = true;
- forceReadLogSegments = true;
- handleException(ReadAheadPhase.GET_LEDGERS, BKException.Code.ZKException);
- }
- }
-
- final class OpenLedgerPhase extends Phase
- implements BookkeeperInternalCallbacks.GenericCallback<LedgerDescriptor>,
- AsyncCallback.ReadLastConfirmedAndEntryCallback {
-
- OpenLedgerPhase(Phase next) {
- super(next);
- }
-
- private void issueReadLastConfirmedAndEntry(final boolean parallel,
- final long lastAddConfirmed) {
- final String ctx = String.format("ReadLastConfirmedAndEntry(%s, %d)", parallel? "Parallel":"Sequential", lastAddConfirmed);
- final ReadLastConfirmedAndEntryCallbackWithNotification callback =
- new ReadLastConfirmedAndEntryCallbackWithNotification(lastAddConfirmed, this, ctx);
- boolean callbackImmediately = setMetadataNotification(callback);
- handleCache.asyncReadLastConfirmedAndEntry(
- currentLH,
- nextReadAheadPosition.getEntryId(),
- conf.getReadLACLongPollTimeout(),
- parallel
- ).addEventListener(new FutureEventListener<Pair<Long, LedgerEntry>>() {
- @Override
- public void onSuccess(Pair<Long, LedgerEntry> lacAndEntry) {
- callback.readLastConfirmedAndEntryComplete(
- BKException.Code.OK,
- lacAndEntry.getLeft(),
- lacAndEntry.getRight(),
- ctx);
- }
-
- @Override
- public void onFailure(Throwable cause) {
- callback.readLastConfirmedAndEntryComplete(
- FutureUtils.bkResultCode(cause),
- lastAddConfirmed,
- null,
- ctx);
- }
- });
- callback.callbackImmediately(callbackImmediately);
- readAheadReadLACAndEntryCounter.inc();
- }
-
- @Override
- void process(int rc) {
- if (!running) {
- setReadAheadStopped();
- return;
- }
-
- tracker.enterPhase(ReadAheadPhase.OPEN_LEDGER);
-
- if (currentMetadata.isInProgress()) { // we don't want to fence the current journal
- if (null == currentLH) {
- if (conf.getTraceReadAheadMetadataChanges()) {
- LOG.info("Opening inprogress ledger of {} for {} at {}.",
- new Object[] { currentMetadata, fullyQualifiedName, System.currentTimeMillis() });
- }
- handleCache.asyncOpenLedger(currentMetadata, false)
- .addEventListener(new FutureEventListener<LedgerDescriptor>() {
- @Override
- public void onSuccess(LedgerDescriptor ld) {
- operationComplete(BKException.Code.OK, ld);
- }
-
- @Override
- public void onFailure(Throwable cause) {
- operationComplete(FutureUtils.bkResultCode(cause), null);
- }
- });
- } else {
- final long lastAddConfirmed;
- try {
- lastAddConfirmed = handleCache.getLastAddConfirmed(currentLH);
- } catch (BKException ie) {
- // Exception is thrown due to no ledger handle
- handleException(ReadAheadPhase.OPEN_LEDGER, ie.getCode());
- return;
- }
-
- if (lastAddConfirmed < nextReadAheadPosition.getEntryId()) {
- // This indicates that the currentMetadata is still marked in
- // progress while we have already read all the entries. It might
- // indicate a failure to detect metadata change. So we
- // should probably try force read log segments if the reader has
- // been idle for after a while.
- if (LACNotAdvancedStopWatch.isRunning()) {
- if (LACNotAdvancedStopWatch.elapsed(TimeUnit.MILLISECONDS)
- > conf.getReaderIdleWarnThresholdMillis()) {
- idleReaderWarn.inc();
- LOG.info("{} Ledger {} for inprogress segment {}, reader has been idle for warn threshold {}",
- new Object[] { fullyQualifiedName, currentMetadata, currentLH, conf.getReaderIdleWarnThresholdMillis() });
- reInitializeMetadata = true;
- forceReadLogSegments = true;
- }
- } else {
- LACNotAdvancedStopWatch.reset().start();
- if (conf.getTraceReadAheadMetadataChanges()) {
- LOG.info("{} Ledger {} for inprogress segment {} closed",
- new Object[] { fullyQualifiedName, currentMetadata, currentLH });
- }
- }
-
- tracker.enterPhase(ReadAheadPhase.READ_LAST_CONFIRMED);
-
- // the readahead is caught up if current ledger is in progress and read position moves over last add confirmed
- if (isCatchingUp) {
- isCatchingUp = false;
- if (isHandleForReading) {
- LOG.info("{} caught up at {}: lac = {}, position = {}.",
- new Object[] { fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition });
- }
- }
-
- LOG.trace("Reading last add confirmed of {} for {}, as read poistion has moved over {} : {}",
- new Object[] { currentMetadata, fullyQualifiedName, lastAddConfirmed, nextReadAheadPosition });
-
- if (nextReadAheadPosition.getEntryId() == 0 && conf.getTraceReadAheadMetadataChanges()) {
- // we are waiting for first entry to arrive
- LOG.info("Reading last add confirmed for {} at {}: lac = {}, position = {}.",
- new Object[] { fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition});
- } else {
- LOG.trace("Reading last add confirmed for {} at {}: lac = {}, position = {}.",
- new Object[] { fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition});
- }
- issueReadLastConfirmedAndEntry(false, lastAddConfirmed);
- } else {
- next.process(BKException.Code.OK);
- }
- }
- } else {
- LACNotAdvancedStopWatch.reset();
- if (null != currentLH) {
- try {
- if (inProgressChanged) {
- LOG.trace("Closing completed ledger of {} for {}.", currentMetadata, fullyQualifiedName);
- if (!closeCurrentLedgerHandle()) {
- return;
- }
- inProgressChanged = false;
- } else {
- long lastAddConfirmed = handleCache.getLastAddConfirmed(currentLH);
- if (nextReadAheadPosition.getEntryId() > lastAddConfirmed) {
- // Its possible that the last entryId does not account for the control
- // log record, but the lastAddConfirmed should never be short of the
- // last entry id; else we maybe missing an entry
- boolean gapDetected = false;
- if (lastAddConfirmed < currentMetadata.getLastEntryId()) {
- alertStatsLogger.raise("Unexpected last entry id during read ahead; {} , {}",
- currentMetadata, lastAddConfirmed);
- gapDetected = true;
- }
-
- if (conf.getPositionGapDetectionEnabled() && gapDetected) {
- setReadAheadError(tracker, new UnexpectedException(
- "Unexpected last entry id during read ahead : " + currentMetadata
- + ", lac = " + lastAddConfirmed));
- } else {
- // This disconnect will only surface during repositioning and
- // will not silently miss records; therefore its safe to not halt
- // reading, but we should print a warning for easy diagnosis
- if (conf.getTraceReadAheadMetadataChanges() && lastAddConfirmed > (currentMetadata.getLastEntryId() + 1)) {
- LOG.warn("Potential Metadata Corruption {} for stream {}, lastAddConfirmed {}",
- new Object[] {currentMetadata, logMetadata.getFullyQualifiedName(), lastAddConfirmed});
- }
-
- LOG.trace("Past the last Add Confirmed {} in ledger {} for {}",
- new Object[] { lastAddConfirmed, currentMetadata, fullyQualifiedName });
- if (!closeCurrentLedgerHandle()) {
- return;
- }
- LogSegmentMetadata oldMetadata = currentMetadata;
- currentMetadata = null;
- if (currentMetadataIndex + 1 < logSegmentList.size()) {
- currentMetadata = logSegmentList.get(++currentMetadataIndex);
- if (currentMetadata.getLogSegmentSequenceNumber() != (oldMetadata.getLogSegmentSequenceNumber() + 1)) {
- // We should never get here as we should have exited the loop if
- // pendingRequests were empty
- alertStatsLogger.raise("Unexpected condition during read ahead; {} , {}",
- currentMetadata, oldMetadata);
- setReadAheadError(tracker, new UnexpectedException(
- "Unexpected condition during read ahead : current metadata "
- + currentMetadata + ", old metadata " + oldMetadata));
- } else {
- if (currentMetadata.isTruncated()) {
- if (conf.getAlertWhenPositioningOnTruncated()) {
- alertStatsLogger.raise("Trying to position reader on the log segment that is marked truncated : {}",
- currentMetadata);
- }
-
- if (!conf.getIgnoreTruncationStatus()) {
- LOG.error("{}: Trying to position reader on the log segment that is marked truncated : {}",
- logMetadata.getFullyQualifiedName(), currentMetadata);
- setReadingFromTruncated(tracker);
- }
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Moving read position to a new ledger {} for {}.",
- currentMetadata, fullyQualifiedName);
- }
- nextReadAheadPosition.positionOnNewLogSegment(currentMetadata.getLogSegmentId(), currentMetadata.getLogSegmentSequenceNumber());
- }
- }
- }
- }
- }
- }
- if (!readAheadError) {
- next.process(BKException.Code.OK);
- }
- } catch (BKException bke) {
- LOG.debug("Exception while repositioning", bke);
- handleException(ReadAheadPhase.CLOSE_LEDGER, bke.getCode());
- }
- } else {
- LOG.trace("Opening completed ledger of {} for {}.", currentMetadata, fullyQualifiedName);
- handleCache.asyncOpenLedger(currentMetadata, true)
- .addEventListener(new FutureEventListener<LedgerDescriptor>() {
- @Override
- public void onSuccess(LedgerDescriptor ld) {
- operationComplete(BKException.Code.OK, ld);
- }
-
- @Override
- public void onFailure(Throwable cause) {
- operationComplete(FutureUtils.bkResultCode(cause), null);
- }
- });
- }
- }
-
- }
-
- @Override
- public void operationComplete(final int rc, final LedgerDescriptor result) {
- // submit callback execution to dlg executor to avoid deadlock.
- submit(new Runnable() {
- @Override
- public void run() {
- if (BKException.Code.OK != rc) {
- LOG.debug("BK Exception {} while opening ledger", rc);
- handleException(ReadAheadPhase.OPEN_LEDGER, rc);
- return;
- }
- currentLH = result;
- if (conf.getTraceReadAheadMetadataChanges()) {
- LOG.info("Opened ledger of {} for {} at {}.",
- new Object[]{currentMetadata, fullyQualifiedName, System.currentTimeMillis()});
- }
- bkcZkExceptions.set(0);
- bkcUnExpectedExceptions.set(0);
- bkcNoLedgerExceptionsOnReadLAC.set(0);
- next.process(rc);
- }
- });
- }
-
- /**
- * Handle the result of reading last add confirmed.
- *
- * @param rc
- * result of reading last add confirmed
- */
- private void handleReadLastConfirmedError(int rc) {
- if (BKException.Code.NoSuchLedgerExistsException == rc) {
- if (bkcNoLedgerExceptionsOnReadLAC.incrementAndGet() > noLedgerExceptionOnReadLACThreshold) {
- LOG.info("{} No entries published to ledger {} yet for {} millis.",
- new Object[] { fullyQualifiedName, currentLH, conf.getReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis() });
- bkcNoLedgerExceptionsOnReadLAC.set(0);
- // set current ledger handle to null, so it would be re-opened again.
- // if the ledger does disappear, it would trigger re-initialize log segments on handling openLedger exceptions
- if (closeCurrentLedgerHandle()) {
- next.process(BKException.Code.OK);
- }
- return;
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.info("{} No entries published to ledger {} yet. Backoff reading ahead for {} ms.",
- new Object[]{fullyQualifiedName, currentLH, conf.getReadAheadWaitTime()});
- }
- // Backoff before resuming
- schedule(ReadAheadWorker.this, conf.getReadAheadWaitTime());
- return;
- }
- } else if (BKException.Code.OK != rc) {
- handleException(ReadAheadPhase.READ_LAST_CONFIRMED, rc);
- return;
- }
- }
-
- public void readLastConfirmedAndEntryComplete(final int rc, final long lastConfirmed, final LedgerEntry entry,
- final Object ctx) {
- // submit callback execution to dlg executor to avoid deadlock.
- submit(new Runnable() {
- @Override
- public void run() {
- if (BKException.Code.OK != rc) {
- handleReadLastConfirmedError(rc);
- return;
- }
- bkcZkExceptions.set(0);
- bkcUnExpectedExceptions.set(0);
- bkcNoLedgerExceptionsOnReadLAC.set(0);
- if (LOG.isTraceEnabled()) {
- try {
- LOG.trace("Advancing Last Add Confirmed of {} for {} : {}, {}",
- new Object[] { currentMetadata, fullyQualifiedName, lastConfirmed,
- handleCache.getLastAddConfirmed(currentLH) });
- } catch (BKException exc) {
- // Ignore
- }
- }
-
- if ((null != entry)
- && (nextReadAheadPosition.getEntryId() == entry.getEntryId())
- && (nextReadAheadPosition.getLedgerId() == entry.getLedgerId())) {
- if (lastConfirmed <= 4 && conf.getTraceReadAheadMetadataChanges()) {
- LOG.info("Hit readLastConfirmedAndEntry for {} at {} : entry = {}, lac = {}, position = {}.",
- new Object[] { fullyQualifiedName, System.currentTimeMillis(),
- entry.getEntryId(), lastConfirmed, nextReadAheadPosition });
- }
-
- if (!isCatchingUp) {
- long lac = lastConfirmed - nextReadAheadPosition.getEntryId();
- if (lac > 0) {
- readAheadLacLagStats.registerSuccessfulEvent(lac);
- }
- }
-
- nextReadAheadPosition.advance();
-
- readAheadCache.set(new LedgerReadPosition(entry.getLedgerId(), currentLH.getLogSegmentSequenceNo(), entry.getEntryId()),
- entry, null != ctx ? ctx.toString() : "",
- currentMetadata.getEnvelopeEntries(), currentMetadata.getStartSequenceId());
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Reading the value received {} for {} : entryId {}",
- new Object[] { currentMetadata, fullyQualifiedName, entry.getEntryId() });
- }
- readAheadEntryPiggyBackHits.inc();
- } else {
- if (lastConfirmed > nextReadAheadPosition.getEntryId()) {
- LOG.info("{} : entry {} isn't piggybacked but last add confirmed already moves to {}.",
- new Object[] { logMetadata.getFullyQualifiedName(), nextReadAheadPosition.getEntryId(), lastConfirmed });
- }
- readAheadEntryPiggyBackMisses.inc();
- }
- next.process(rc);
- }
- });
- }
- }
-
- final class ReadEntriesPhase extends Phase implements Runnable {
-
- boolean cacheFull = false;
- long lastAddConfirmed = -1;
-
- ReadEntriesPhase(Phase next) {
- super(next);
- }
-
- @Override
- void process(int rc) {
- if (!running) {
- setReadAheadStopped();
- return;
- }
-
- tracker.enterPhase(ReadAheadPhase.READ_ENTRIES);
-
- cacheFull = false;
- lastAddConfirmed = -1;
- if (null != currentLH) {
- try {
- lastAddConfirmed = handleCache.getLastAddConfirmed(currentLH);
- } catch (BKException bke) {
- handleException(ReadAheadPhase.READ_LAST_CONFIRMED, bke.getCode());
- return;
- }
- read();
- } else {
- complete();
- }
- }
-
- private void read() {
- if (lastAddConfirmed < nextReadAheadPosition.getEntryId()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Nothing to read for {} of {} : lastAddConfirmed = {}, nextReadAheadPosition = {}",
- new Object[] { currentMetadata, fullyQualifiedName, lastAddConfirmed, nextReadAheadPosition});
- }
- complete();
- return;
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Reading entry {} for {} of {}.",
- new Object[] {nextReadAheadPosition, currentMetadata, fullyQualifiedName });
- }
- int readAheadBatchSize = dynConf.getReadAheadBatchSize();
- final long startEntryId = nextReadAheadPosition.getEntryId();
- final long endEntryId = Math.min(lastAddConfirmed, (nextReadAheadPosition.getEntryId() + readAheadBatchSize - 1));
-
- if (endEntryId <= readAheadBatchSize && conf.getTraceReadAheadMetadataChanges()) {
- // trace first read batch
- LOG.info("Reading entries ({} - {}) for {} at {} : lac = {}, nextReadAheadPosition = {}.",
- new Object[] { startEntryId, endEntryId, fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition});
- }
-
- final String readCtx = String.format("ReadEntries(%d-%d)", startEntryId, endEntryId);
- handleCache.asyncReadEntries(currentLH, startEntryId, endEntryId)
- .addEventListener(new FutureEventListener<Enumeration<LedgerEntry>>() {
-
- @Override
- public void onSuccess(Enumeration<LedgerEntry> entries) {
- int rc = BKException.Code.OK;
-
- if (failureInjector.shouldInjectCorruption(startEntryId, endEntryId)) {
- rc = BKException.Code.DigestMatchException;
- }
- readComplete(rc, null, entries, readCtx, startEntryId, endEntryId);
- }
-
- @Override
- public void onFailure(Throwable cause) {
- readComplete(FutureUtils.bkResultCode(cause), null, null, readCtx, startEntryId, endEntryId);
- }
- });
- }
-
- public void readComplete(final int rc, final LedgerHandle lh,
- final Enumeration<LedgerEntry> seq, final Object ctx,
- final long startEntryId, final long endEntryId) {
- // submit callback execution to dlg executor to avoid deadlock.
- submit(new Runnable() {
- @Override
- public void run() {
- // If readAheadSkipBrokenEntries is enabled and we hit a corrupt entry, log and
- // stat the issue and move forward.
- if (BKException.Code.DigestMatchException == rc && readAheadSkipBrokenEntries) {
- readAheadReadEntriesStat.registerFailedEvent(0);
- LOG.error("BK DigestMatchException while reading entries {}-{} in stream {}, entry {} discarded",
- new Object[] { startEntryId, endEntryId, fullyQualifiedName, startEntryId });
- bkcZkExceptions.set(0);
- bkcUnExpectedExceptions.set(0);
- readAheadSkippedBrokenEntries.inc();
- nextReadAheadPosition.advance();
- } else if (BKException.Code.OK != rc) {
- readAheadReadEntriesStat.registerFailedEvent(0);
- LOG.debug("BK Exception {} while reading entry", rc);
- handleException(ReadAheadPhase.READ_ENTRIES, rc);
- return;
- } else {
- int numReads = 0;
- while (seq.hasMoreElements()) {
- bkcZkExceptions.set(0);
- bkcUnExpectedExceptions.set(0);
- nextReadAheadPosition.advance();
- LedgerEntry e = seq.nextElement();
- LedgerReadPosition readPosition = new LedgerReadPosition(e.getLedgerId(), currentMetadata.getLogSegmentSequenceNumber(), e.getEntryId());
- readAheadCache.set(readPosition, e, null != ctx ? ctx.toString() : "",
- currentMetadata.getEnvelopeEntries(), currentMetadata.getStartSequenceId());
- ++numReads;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Read entry {} of {}.", readPosition, fullyQualifiedName);
- }
- }
- readAheadReadEntriesStat.registerSuccessfulEvent(numReads);
- }
- if (readAheadCache.isCacheFull()) {
- cacheFull = true;
- complete();
- } else {
- read();
- }
- }
- });
- }
-
- private void complete() {
- if (cacheFull) {
- LOG.trace("Cache for {} is full. Backoff reading until notified", fullyQualifiedName);
- readAheadCacheFullCounter.inc();
- resumeStopWatch.reset().start();
- stopPromise = null;
- readAheadCache.setReadAheadCallback(ReadAheadWorker.this);
- } else {
- run();
- }
- }
-
- @Override
- public void run() {
- next.process(BKException.Code.OK);
- }
- }
-
- @Override
- public void run() {
- if (!running) {
- setReadAheadStopped();
- return;
- }
- readAheadPhase.process(BKException.Code.OK);
- }
-
- @Override
- public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
- AsyncNotification notification;
- synchronized (notificationLock) {
- logSegmentListNotified = segments;
- reInitializeMetadata = true;
- LOG.debug("{} Read ahead node changed", fullyQualifiedName);
- notification = metadataNotification;
- metadataNotification = null;
- }
- metadataNotificationTimeMillis = System.currentTimeMillis();
- if (null != notification) {
- notification.notifyOnOperationComplete();
- }
- }
-
- @Override
- public void onLogStreamDeleted() {
- logDeleted = true;
- setReadAheadError(tracker, new LogNotFoundException("Log stream "
- + bkLedgerManager.getFullyQualifiedName() + " is deleted."));
- }
-
- /**
- * Set metadata notification and return the flag indicating whether to reinitialize metadata.
- *
- * @param notification
- * metadata notification
- * @return flag indicating whether to reinitialize metadata.
- */
- private boolean setMetadataNotification(AsyncNotification notification) {
- synchronized (notificationLock) {
- this.metadataNotification = notification;
- return reInitializeMetadata;
- }
- }
-
- @VisibleForTesting
- public AsyncNotification getMetadataNotification() {
- synchronized (notificationLock) {
- return metadataNotification;
- }
- }
-
- /**
- * A scheduled runnable that could be waken and executed immediately when notification arrives.
- *
- * E.g
- * <p>
- * The reader reaches end of stream, it backs off to schedule next read in 2 seconds.
- * <br/>
- * if a new log segment is created, without this change, reader has to wait 2 seconds to read
- * entries in new log segment, which means delivery latency of entries in new log segment could
- * be up to 2 seconds. but with this change, the task would be executed immediately, which reader
- * would be waken up from backoff, which would reduce the delivery latency.
- * </p>
- */
- class InterruptibleScheduledRunnable implements AsyncNotification, Runnable {
-
- final Runnable task;
- final AtomicBoolean called = new AtomicBoolean(false);
- final long startNanos;
-
- InterruptibleScheduledRunnable(Runnable task) {
- this.task = task;
- this.startNanos = MathUtils.nowInNano();
- }
-
- @Override
- public void notifyOnError(Throwable t) {
- longPollInterruptionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startNanos));
- execute();
- }
-
- @Override
- public void notifyOnOperationComplete() {
- longPollInterruptionStat.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startNanos));
- execute();
- }
-
- @Override
- public void run() {
- if (called.compareAndSet(false, true)) {
- task.run();
- }
- }
-
- void execute() {
- if (called.compareAndSet(false, true)) {
- submit(task);
- }
- }
- }
-
- abstract class LongPollNotification<T> implements AsyncNotification {
-
- final long lac;
- final T cb;
- final Object ctx;
- final AtomicBoolean called = new AtomicBoolean(false);
- final long startNanos;
-
- LongPollNotification(long lac, T cb, Object ctx) {
- this.lac = lac;
- this.cb = cb;
- this.ctx = ctx;
- this.startNanos = MathUtils.nowInNano();
- }
-
- void complete(boolean success) {
- long startTime = MathUtils.nowInNano();
- doComplete(success);
- if (success) {
- notificationExecutionStat.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startTime));
- } else {
- notificationExecutionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startTime));
- }
- }
-
- abstract void doComplete(boolean success);
-
- @Override
- public void notifyOnError(Throwable cause) {
- longPollInterruptionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startNanos));
- complete(false);
- }
-
- @Override
- public void notifyOnOperationComplete() {
- longPollInterruptionStat.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startNanos));
- complete(true);
- }
-
- void callbackImmediately(boolean immediate) {
- if (immediate) {
- complete(true);
- }
- }
- }
-
- class ReadLastConfirmedAndEntryCallbackWithNotification
- extends LongPollNotification<AsyncCallback.ReadLastConfirmedAndEntryCallback>
- implements AsyncCallback.ReadLastConfirmedAndEntryCallback {
-
- ReadLastConfirmedAndEntryCallbackWithNotification(
- long lac, AsyncCallback.ReadLastConfirmedAndEntryCallback cb, Object ctx) {
- super(lac, cb, ctx);
- }
-
- @Override
- public void readLastConfirmedAndEntryComplete(int rc, long lac, LedgerEntry entry, Object ctx) {
- if (called.compareAndSet(false, true)) {
- // clear the notification when callback
- synchronized (notificationLock) {
- metadataNotification = null;
- }
- this.cb.readLastConfirmedAndEntryComplete(rc, lac, entry, ctx);
- }
- }
-
- @Override
- void doComplete(boolean success) {
- readLastConfirmedAndEntryComplete(BKException.Code.OK, lac, null, ctx);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/ReadAheadExceptionsLogger.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/ReadAheadExceptionsLogger.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/ReadAheadExceptionsLogger.java
deleted file mode 100644
index 326f92b..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/ReadAheadExceptionsLogger.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.stats;
-
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Stats logger to log exceptions happened in {@link com.twitter.distributedlog.readahead.ReadAheadWorker}.
- * They are counters of exceptions happened on each read ahead phase:
- * <code>`scope`/exceptions/`phase`/`code`</code>. `scope` is the current scope of
- * stats logger, `phase` is the read ahead phase, while `code` is the exception code. Check
- * {@link com.twitter.distributedlog.readahead.ReadAheadPhase} for details about phases and
- * {@link BKExceptionStatsLogger} for details about `code`.
- */
-public class ReadAheadExceptionsLogger {
-
- private final StatsLogger statsLogger;
- private StatsLogger parentExceptionStatsLogger;
- private final ConcurrentMap<String, BKExceptionStatsLogger> exceptionStatsLoggers =
- new ConcurrentHashMap<String, BKExceptionStatsLogger>();
-
- public ReadAheadExceptionsLogger(StatsLogger statsLogger) {
- this.statsLogger = statsLogger;
- }
-
- public BKExceptionStatsLogger getBKExceptionStatsLogger(String phase) {
- // initialize the parent exception stats logger lazily
- if (null == parentExceptionStatsLogger) {
- parentExceptionStatsLogger = statsLogger.scope("exceptions");
- }
- BKExceptionStatsLogger exceptionStatsLogger = exceptionStatsLoggers.get(phase);
- if (null == exceptionStatsLogger) {
- exceptionStatsLogger = new BKExceptionStatsLogger(parentExceptionStatsLogger.scope(phase));
- BKExceptionStatsLogger oldExceptionStatsLogger =
- exceptionStatsLoggers.putIfAbsent(phase, exceptionStatsLogger);
- if (null != oldExceptionStatsLogger) {
- exceptionStatsLogger = oldExceptionStatsLogger;
- }
- }
- return exceptionStatsLogger;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLedgerHandleCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLedgerHandleCache.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLedgerHandleCache.java
deleted file mode 100644
index 1829e54..0000000
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLedgerHandleCache.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.util.FutureUtils;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.*;
-
-/**
- * Test {@link LedgerHandleCache}
- */
-public class TestLedgerHandleCache extends TestDistributedLogBase {
- static final Logger LOG = LoggerFactory.getLogger(TestLedgerHandleCache.class);
-
- protected static String ledgersPath = "/ledgers";
-
- private ZooKeeperClient zkc;
- private BookKeeperClient bkc;
-
- @Before
- public void setup() throws Exception {
- zkc = TestZooKeeperClientBuilder.newBuilder()
- .zkServers(zkServers)
- .build();
- bkc = BookKeeperClientBuilder.newBuilder()
- .name("bkc")
- .zkc(zkc)
- .ledgersPath(ledgersPath)
- .dlConfig(conf)
- .build();
- }
-
- @After
- public void teardown() throws Exception {
- bkc.close();
- zkc.close();
- }
-
- @Test(timeout = 60000, expected = NullPointerException.class)
- public void testBuilderWithoutBKC() throws Exception {
- LedgerHandleCache.newBuilder().build();
- }
-
- @Test(timeout = 60000, expected = NullPointerException.class)
- public void testBuilderWithoutStatsLogger() throws Exception {
- LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).statsLogger(null).build();
- }
-
- @Test(timeout = 60000, expected = BKException.BKBookieHandleNotAvailableException.class)
- public void testOpenLedgerWhenBkcClosed() throws Exception {
- BookKeeperClient newBkc = BookKeeperClientBuilder.newBuilder().name("newBkc")
- .zkc(zkc).ledgersPath(ledgersPath).dlConfig(conf).build();
- LedgerHandleCache cache =
- LedgerHandleCache.newBuilder().bkc(newBkc).conf(conf).build();
- // closed the bkc
- newBkc.close();
- // open ledger after bkc closed.
- cache.openLedger(new LogSegmentMetadata.LogSegmentMetadataBuilder("", 2, 1, 1).setRegionId(1).build(), false);
- }
-
- @Test(timeout = 60000, expected = BKException.ZKException.class)
- public void testOpenLedgerWhenZkClosed() throws Exception {
- ZooKeeperClient newZkc = TestZooKeeperClientBuilder.newBuilder()
- .name("zkc-openledger-when-zk-closed")
- .zkServers(zkServers)
- .build();
- BookKeeperClient newBkc = BookKeeperClientBuilder.newBuilder()
- .name("bkc-openledger-when-zk-closed")
- .zkc(newZkc)
- .ledgersPath(ledgersPath)
- .dlConfig(conf)
- .build();
- try {
- LedgerHandle lh = newBkc.get().createLedger(BookKeeper.DigestType.CRC32, "zkcClosed".getBytes(UTF_8));
- lh.close();
- newZkc.close();
- LedgerHandleCache cache =
- LedgerHandleCache.newBuilder().bkc(newBkc).conf(conf).build();
- // open ledger after zkc closed
- cache.openLedger(new LogSegmentMetadata.LogSegmentMetadataBuilder("",
- 2, lh.getId(), 1).setLogSegmentSequenceNo(lh.getId()).build(), false);
- } finally {
- newBkc.close();
- }
- }
-
- @Test(timeout = 60000, expected = BKException.BKUnexpectedConditionException.class)
- public void testReadLastConfirmedWithoutOpeningLedger() throws Exception {
- LedgerDescriptor desc = new LedgerDescriptor(9999, 9999, false);
- LedgerHandleCache cache =
- LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build();
- // read last confirmed
- cache.tryReadLastConfirmed(desc);
- }
-
- @Test(timeout = 60000, expected = BKException.BKUnexpectedConditionException.class)
- public void testReadEntriesWithoutOpeningLedger() throws Exception {
- LedgerDescriptor desc = new LedgerDescriptor(9999, 9999, false);
- LedgerHandleCache cache =
- LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build();
- // read entries
- cache.readEntries(desc, 0, 10);
- }
-
- @Test(timeout = 60000, expected = BKException.BKUnexpectedConditionException.class)
- public void testGetLastConfirmedWithoutOpeningLedger() throws Exception {
- LedgerDescriptor desc = new LedgerDescriptor(9999, 9999, false);
- LedgerHandleCache cache =
- LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build();
- // read entries
- cache.getLastAddConfirmed(desc);
- }
-
- @Test(timeout = 60000, expected = BKException.BKUnexpectedConditionException.class)
- public void testReadLastConfirmedAndEntryWithoutOpeningLedger() throws Exception {
- LedgerDescriptor desc = new LedgerDescriptor(9999, 9999, false);
- LedgerHandleCache cache =
- LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build();
- // read entries
- FutureUtils.bkResult(cache.asyncReadLastConfirmedAndEntry(desc, 1L, 200L, false));
- }
-
- @Test(timeout = 60000, expected = BKException.BKUnexpectedConditionException.class)
- public void testGetLengthWithoutOpeningLedger() throws Exception {
- LedgerDescriptor desc = new LedgerDescriptor(9999, 9999, false);
- LedgerHandleCache cache =
- LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build();
- // read entries
- cache.getLength(desc);
- }
-
- @Test(timeout = 60000)
- public void testOpenAndCloseLedger() throws Exception {
- LedgerHandle lh = bkc.get().createLedger(1, 1, 1,
- BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
- LedgerHandleCache cache =
- LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build();
- LogSegmentMetadata segment = new LogSegmentMetadata.LogSegmentMetadataBuilder(
- "/data", LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID, lh.getId(), 0L)
- .build();
- LedgerDescriptor desc1 = cache.openLedger(segment, false);
- assertTrue(cache.handlesMap.containsKey(desc1));
- LedgerHandleCache.RefCountedLedgerHandle refLh = cache.handlesMap.get(desc1);
- assertEquals(1, refLh.getRefCount());
- cache.openLedger(segment, false);
- assertTrue(cache.handlesMap.containsKey(desc1));
- assertEquals(2, refLh.getRefCount());
- // close the ledger
- cache.closeLedger(desc1);
- assertTrue(cache.handlesMap.containsKey(desc1));
- assertEquals(1, refLh.getRefCount());
- cache.closeLedger(desc1);
- assertFalse(cache.handlesMap.containsKey(desc1));
- assertEquals(0, refLh.getRefCount());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
index 573ae5c..74a5231 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
@@ -99,7 +99,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
true);
LogSegmentEntryStore entryStore = new BKLogSegmentEntryStore(
conf,
- bkc.get(),
+ bkc,
scheduler,
NullStatsLogger.INSTANCE,
AsyncFailureInjector.NULL);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java
index 998c7ba..4358a8e 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java
@@ -34,8 +34,6 @@ import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.BoxedUnit;
import static org.junit.Assert.*;
@@ -52,42 +50,22 @@ public class TestReadUtils extends TestDistributedLogBase {
private Future<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId(
BKDistributedLogManager bkdlm, int logsegmentIdx, long transactionId) throws Exception {
List<LogSegmentMetadata> logSegments = bkdlm.getLogSegments();
- final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder()
- .bkc(bkdlm.getWriterBKC())
- .conf(conf)
- .build();
return ReadUtils.getLogRecordNotLessThanTxId(
bkdlm.getStreamName(),
logSegments.get(logsegmentIdx),
transactionId,
Executors.newSingleThreadExecutor(),
- handleCache,
+ bkdlm.getReaderEntryStore(),
10
- ).ensure(new AbstractFunction0<BoxedUnit>() {
- @Override
- public BoxedUnit apply() {
- handleCache.clear();
- return BoxedUnit.UNIT;
- }
- });
+ );
}
private Future<LogRecordWithDLSN> getFirstGreaterThanRecord(BKDistributedLogManager bkdlm, int ledgerNo, DLSN dlsn) throws Exception {
List<LogSegmentMetadata> ledgerList = bkdlm.getLogSegments();
- final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder()
- .bkc(bkdlm.getWriterBKC())
- .conf(conf)
- .build();
return ReadUtils.asyncReadFirstUserRecord(
bkdlm.getStreamName(), ledgerList.get(ledgerNo), 2, 16, new AtomicInteger(0), Executors.newFixedThreadPool(1),
- handleCache, dlsn
- ).ensure(new AbstractFunction0<BoxedUnit>() {
- @Override
- public BoxedUnit apply() {
- handleCache.clear();
- return BoxedUnit.UNIT;
- }
- });
+ bkdlm.getReaderEntryStore(), dlsn
+ );
}
private Future<LogRecordWithDLSN> getLastUserRecord(BKDistributedLogManager bkdlm, int ledgerNo) throws Exception {
@@ -98,20 +76,10 @@ public class TestReadUtils extends TestDistributedLogBase {
LogSegmentFilter.DEFAULT_FILTER,
null)
).getValue();
- final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder()
- .bkc(bkdlm.getWriterBKC())
- .conf(conf)
- .build();
return ReadUtils.asyncReadLastRecord(
bkdlm.getStreamName(), ledgerList.get(ledgerNo), false, false, false, 2, 16, new AtomicInteger(0), Executors.newFixedThreadPool(1),
- handleCache
- ).ensure(new AbstractFunction0<BoxedUnit>() {
- @Override
- public BoxedUnit apply() {
- handleCache.clear();
- return BoxedUnit.UNIT;
- }
- });
+ bkdlm.getReaderEntryStore()
+ );
}
@Test(timeout = 60000)