You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:30 UTC
[16/31] incubator-distributedlog git commit: DL-160: Remove 'DLSN'
suffix from async and sync readers
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
deleted file mode 100644
index adf49a1..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Ticker;
-import com.twitter.distributedlog.exceptions.EndOfStreamException;
-import com.twitter.distributedlog.exceptions.IdleReaderException;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Versioned;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Synchronous Log Reader based on {@link AsyncLogReader}
- */
-class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
-
- private final BKDistributedLogManager bkdlm;
- private final BKLogReadHandler readHandler;
- private final AtomicReference<IOException> readerException =
- new AtomicReference<IOException>(null);
- private final int maxReadAheadWaitTime;
- private Promise<Void> closeFuture;
- private final Optional<Long> startTransactionId;
- private boolean positioned = false;
- private Entry.Reader currentEntry = null;
-
- // readahead reader
- ReadAheadEntryReader readAheadReader = null;
-
- // idle reader settings
- private final boolean shouldCheckIdleReader;
- private final int idleErrorThresholdMillis;
-
- // Stats
- private final Counter idleReaderError;
-
- BKSyncLogReaderDLSN(DistributedLogConfiguration conf,
- BKDistributedLogManager bkdlm,
- DLSN startDLSN,
- Optional<Long> startTransactionId,
- StatsLogger statsLogger) throws IOException {
- this.bkdlm = bkdlm;
- this.readHandler = bkdlm.createReadHandler(
- Optional.<String>absent(),
- this,
- true);
- this.maxReadAheadWaitTime = conf.getReadAheadWaitTime();
- this.idleErrorThresholdMillis = conf.getReaderIdleErrorThresholdMillis();
- this.shouldCheckIdleReader = idleErrorThresholdMillis > 0 && idleErrorThresholdMillis < Integer.MAX_VALUE;
- this.startTransactionId = startTransactionId;
-
- // start readahead
- startReadAhead(startDLSN);
- if (!startTransactionId.isPresent()) {
- positioned = true;
- }
-
- // Stats
- StatsLogger syncReaderStatsLogger = statsLogger.scope("sync_reader");
- idleReaderError = syncReaderStatsLogger.getCounter("idle_reader_error");
- }
-
- private void startReadAhead(DLSN startDLSN) throws IOException {
- readAheadReader = new ReadAheadEntryReader(
- bkdlm.getStreamName(),
- startDLSN,
- bkdlm.getConf(),
- readHandler,
- bkdlm.getReaderEntryStore(),
- bkdlm.getScheduler(),
- Ticker.systemTicker(),
- bkdlm.alertStatsLogger);
- readHandler.registerListener(readAheadReader);
- readHandler.asyncStartFetchLogSegments()
- .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
- readAheadReader.addStateChangeNotification(BKSyncLogReaderDLSN.this);
- readAheadReader.start(logSegments.getValue());
- return BoxedUnit.UNIT;
- }
- });
- }
-
- @VisibleForTesting
- ReadAheadEntryReader getReadAheadReader() {
- return readAheadReader;
- }
-
- @VisibleForTesting
- BKLogReadHandler getReadHandler() {
- return readHandler;
- }
-
- private Entry.Reader readNextEntry(boolean nonBlocking) throws IOException {
- Entry.Reader entry = null;
- if (nonBlocking) {
- return readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
- } else {
- while (!readAheadReader.isReadAheadCaughtUp()
- && null == readerException.get()
- && null == entry) {
- entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
- }
- if (null != entry) {
- return entry;
- }
- // reader is caught up
- if (readAheadReader.isReadAheadCaughtUp()
- && null == readerException.get()) {
- entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
- }
- return entry;
- }
- }
-
- private void markReaderAsIdle() throws IdleReaderException {
- idleReaderError.inc();
- IdleReaderException ire = new IdleReaderException("Sync reader on stream "
- + readHandler.getFullyQualifiedName()
- + " is idle for more than " + idleErrorThresholdMillis + " ms");
- readerException.compareAndSet(null, ire);
- throw ire;
- }
-
- @Override
- public synchronized LogRecordWithDLSN readNext(boolean nonBlocking)
- throws IOException {
- if (null != readerException.get()) {
- throw readerException.get();
- }
- LogRecordWithDLSN record = doReadNext(nonBlocking);
- // no record is returned, check if the reader becomes idle
- if (null == record && shouldCheckIdleReader) {
- if (readAheadReader.getNumCachedEntries() <= 0 &&
- readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) {
- markReaderAsIdle();
- }
- }
- return record;
- }
-
- private LogRecordWithDLSN doReadNext(boolean nonBlocking) throws IOException {
- LogRecordWithDLSN record = null;
-
- do {
- // fetch one record until we don't find any entry available in the readahead cache
- while (null == record) {
- if (null == currentEntry) {
- currentEntry = readNextEntry(nonBlocking);
- if (null == currentEntry) {
- return null;
- }
- }
- record = currentEntry.nextRecord();
- if (null == record) {
- currentEntry = null;
- }
- }
-
- // check if we reached the end of stream
- if (record.isEndOfStream()) {
- EndOfStreamException eos = new EndOfStreamException("End of Stream Reached for "
- + readHandler.getFullyQualifiedName());
- readerException.compareAndSet(null, eos);
- throw eos;
- }
- // skip control records
- if (record.isControl()) {
- record = null;
- continue;
- }
- if (!positioned) {
- if (record.getTransactionId() < startTransactionId.get()) {
- record = null;
- continue;
- } else {
- positioned = true;
- break;
- }
- } else {
- break;
- }
- } while (true);
- return record;
- }
-
- @Override
- public synchronized List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords)
- throws IOException {
- LinkedList<LogRecordWithDLSN> retList =
- new LinkedList<LogRecordWithDLSN>();
-
- int numRead = 0;
- LogRecordWithDLSN record = readNext(nonBlocking);
- while ((null != record)) {
- retList.add(record);
- numRead++;
- if (numRead >= numLogRecords) {
- break;
- }
- record = readNext(nonBlocking);
- }
- return retList;
- }
-
- @Override
- public Future<Void> asyncClose() {
- Promise<Void> closePromise;
- synchronized (this) {
- if (null != closeFuture) {
- return closeFuture;
- }
- closeFuture = closePromise = new Promise<Void>();
- }
- readHandler.unregisterListener(readAheadReader);
- readAheadReader.removeStateChangeNotification(this);
- Utils.closeSequence(bkdlm.getScheduler(), true,
- readAheadReader,
- readHandler
- ).proxyTo(closePromise);
- return closePromise;
- }
-
- @Override
- public void close() throws IOException {
- FutureUtils.result(asyncClose());
- }
-
- //
- // Notification From ReadHandler
- //
-
- @Override
- public void notifyOnError(Throwable cause) {
- if (cause instanceof IOException) {
- readerException.compareAndSet(null, (IOException) cause);
- } else {
- readerException.compareAndSet(null, new IOException(cause));
- }
- }
-
- @Override
- public void notifyOnOperationComplete() {
- // no-op
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/resources/findbugsExclude.xml b/distributedlog-core/src/main/resources/findbugsExclude.xml
index 684b827..c07fad9 100644
--- a/distributedlog-core/src/main/resources/findbugsExclude.xml
+++ b/distributedlog-core/src/main/resources/findbugsExclude.xml
@@ -33,7 +33,7 @@
<Bug pattern="EI_EXPOSE_REP" />
</Match>
<Match>
- <Class name="com.twitter.distributedlog.BKAsyncLogReaderDLSN" />
+ <Class name="com.twitter.distributedlog.BKAsyncLogReader" />
<Method name="run" />
<Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER" />
</Match>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
index c34eb6e..3a1ab88 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
@@ -46,15 +46,15 @@ class NonBlockingReadsTestUtil {
boolean forceStall,
long segmentSize,
boolean waitForIdle) throws Exception {
- BKSyncLogReaderDLSN reader = null;
+ BKSyncLogReader reader = null;
try {
- reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1);
+ reader = (BKSyncLogReader) dlm.getInputStream(1);
} catch (LogNotFoundException lnfe) {
}
while (null == reader) {
TimeUnit.MILLISECONDS.sleep(20);
try {
- reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1);
+ reader = (BKSyncLogReader) dlm.getInputStream(1);
} catch (LogNotFoundException lnfe) {
} catch (LogEmptyException lee) {
} catch (IOException ioe) {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
index 9927616..a6cffbb 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
@@ -75,7 +75,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase {
writer.closeAndComplete();
Future<AsyncLogReader> futureReader1 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
- BKAsyncLogReaderDLSN reader1 = (BKAsyncLogReaderDLSN) Await.result(futureReader1);
+ BKAsyncLogReader reader1 = (BKAsyncLogReader) Await.result(futureReader1);
LogRecordWithDLSN record = Await.result(reader1.readNext());
assertEquals(1L, record.getTransactionId());
assertEquals(0L, record.getSequenceId());
@@ -542,7 +542,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase {
writer.closeAndComplete();
}
- BKAsyncLogReaderDLSN reader0 = (BKAsyncLogReaderDLSN) Await.result(dlm.getAsyncLogReaderWithLock(subscriberId));
+ BKAsyncLogReader reader0 = (BKAsyncLogReader) Await.result(dlm.getAsyncLogReaderWithLock(subscriberId));
assertEquals(DLSN.NonInclusiveLowerBound, reader0.getStartDLSN());
long numTxns = 0;
LogRecordWithDLSN record = Await.result(reader0.readNext());
@@ -562,7 +562,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase {
SubscriptionsStore subscriptionsStore = dlm.getSubscriptionsStore();
Await.result(subscriptionsStore.advanceCommitPosition(subscriberId, readDLSN));
- BKAsyncLogReaderDLSN reader1 = (BKAsyncLogReaderDLSN) Await.result(dlm.getAsyncLogReaderWithLock(subscriberId));
+ BKAsyncLogReader reader1 = (BKAsyncLogReader) Await.result(dlm.getAsyncLogReaderWithLock(subscriberId));
assertEquals(readDLSN, reader1.getStartDLSN());
numTxns = 0;
long startTxID = 10L;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index 95d760e..65507ac 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -1397,7 +1397,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
}, 0, TimeUnit.MILLISECONDS);
latch.await();
- BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN);
+ BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
if (simulateReaderStall) {
reader.disableProcessingReadRequests();
}
@@ -1538,7 +1538,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
}, 0, TimeUnit.MILLISECONDS);
latch.await();
- BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN)dlm.getAsyncLogReader(DLSN.InitialDLSN);
+ BKAsyncLogReader reader = (BKAsyncLogReader)dlm.getAsyncLogReader(DLSN.InitialDLSN);
reader.disableReadAheadLogSegmentsNotification();
boolean exceptionEncountered = false;
int recordCount = 0;
@@ -1616,7 +1616,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
record.setControl();
Await.result(writer.write(record));
- BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN);
+ BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
record = Await.result(reader.readNext());
LOG.info("Read record {}", record);
assertEquals(1L, record.getTransactionId());
@@ -1668,7 +1668,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
} catch (EndOfStreamException ex) {
}
- BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN);
+ BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
LogRecord record = null;
for (int j = 0; j < NUM_RECORDS; j++) {
record = Await.result(reader.readNext());
@@ -1702,7 +1702,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
}
writer.close();
- BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN);
+ BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
try {
LogRecord record = Await.result(reader.readNext());
fail("Should have thrown");
@@ -1727,7 +1727,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
controlRecord.setControl();
FutureUtils.result(writer.write(controlRecord));
- BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN);
+ BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
Future<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
Future<LogRecordWithDLSN> readFuture = reader.readNext();
@@ -1772,7 +1772,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
controlRecord.setControl();
FutureUtils.result(writer.write(controlRecord));
- BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN);
+ BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
Future<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, 0, TimeUnit.MILLISECONDS);
Future<LogRecordWithDLSN> readFuture = reader.readNext();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
index 54177c8..d28b62c 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
@@ -159,7 +159,7 @@ public class TestBKSyncLogReader extends TestDistributedLogBase {
// all 10 records are added to the stream
// then open a reader to read
- BKSyncLogReaderDLSN reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1L);
+ BKSyncLogReader reader = (BKSyncLogReader) dlm.getInputStream(1L);
// wait until readahead caught up
while (!reader.getReadAheadReader().isReadAheadCaughtUp()) {
@@ -226,7 +226,7 @@ public class TestBKSyncLogReader extends TestDistributedLogBase {
logger.info("Write first 10 records");
// open a reader to read
- BKSyncLogReaderDLSN reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1L);
+ BKSyncLogReader reader = (BKSyncLogReader) dlm.getInputStream(1L);
// resume reading from sync reader. so it should be able to read all 10 records
// and return null to claim it as caughtup
LogRecord record = reader.readNext(false);
@@ -283,7 +283,7 @@ public class TestBKSyncLogReader extends TestDistributedLogBase {
}, 0, 400, TimeUnit.MILLISECONDS);
// open a reader to read
- BKSyncLogReaderDLSN reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1L);
+ BKSyncLogReader reader = (BKSyncLogReader) dlm.getInputStream(1L);
// resume reading from sync reader. so it should be able to read all 10 records
// and return null to claim it as caughtup
LogRecord record = reader.readNext(false);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
index 3f47337..bfa9156 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
@@ -99,7 +99,7 @@ public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase {
DistributedLogManager dlmread = createNewDLM(conf, name);
- BKSyncLogReaderDLSN reader0 = (BKSyncLogReaderDLSN) dlmread.getInputStream(0);
+ BKSyncLogReader reader0 = (BKSyncLogReader) dlmread.getInputStream(0);
try {
ReaderThread[] readerThreads = new ReaderThread[1];
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java
index 1739b47..6040549 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java
@@ -118,7 +118,7 @@ public class TestReader implements FutureEventListener<LogRecordWithDLSN> {
try {
AsyncLogReader reader = dlm.getAsyncLogReader(dlsn);
if (simulateErrors) {
- ((BKAsyncLogReaderDLSN) reader).simulateErrors();
+ ((BKAsyncLogReader) reader).simulateErrors();
}
nextDLSN = dlsn;
LOG.info("Positioned reader {} at {}", readerName, dlsn);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
index ee53362..99ef041 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
@@ -312,7 +312,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
}
private void checkAndWaitWriterReaderPosition(BKLogSegmentWriter writer, long expectedWriterPosition,
- BKAsyncLogReaderDLSN reader, long expectedReaderPosition,
+ BKAsyncLogReader reader, long expectedReaderPosition,
LedgerHandle inspector, long expectedLac) throws Exception {
while (getLedgerHandle(writer).getLastAddConfirmed() < expectedWriterPosition) {
Thread.sleep(1000);
@@ -357,7 +357,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
}
BKDistributedLogManager readDLM = (BKDistributedLogManager) createNewDLM(confLocal, name);
- final BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) readDLM.getAsyncLogReader(DLSN.InitialDLSN);
+ final BKAsyncLogReader reader = (BKAsyncLogReader) readDLM.getAsyncLogReader(DLSN.InitialDLSN);
// 2) reader should be able to read 5 entries.
for (long i = 1; i <= numEntries; i++) {