You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:30 UTC

[16/31] incubator-distributedlog git commit: DL-160: Remove 'DLSN' suffix from async and sync readers

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
deleted file mode 100644
index adf49a1..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Ticker;
-import com.twitter.distributedlog.exceptions.EndOfStreamException;
-import com.twitter.distributedlog.exceptions.IdleReaderException;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Versioned;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Synchronous Log Reader based on {@link AsyncLogReader}
- */
-class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
-
-    private final BKDistributedLogManager bkdlm;
-    private final BKLogReadHandler readHandler;
-    private final AtomicReference<IOException> readerException =
-            new AtomicReference<IOException>(null);
-    private final int maxReadAheadWaitTime;
-    private Promise<Void> closeFuture;
-    private final Optional<Long> startTransactionId;
-    private boolean positioned = false;
-    private Entry.Reader currentEntry = null;
-
-    // readahead reader
-    ReadAheadEntryReader readAheadReader = null;
-
-    // idle reader settings
-    private final boolean shouldCheckIdleReader;
-    private final int idleErrorThresholdMillis;
-
-    // Stats
-    private final Counter idleReaderError;
-
-    BKSyncLogReaderDLSN(DistributedLogConfiguration conf,
-                        BKDistributedLogManager bkdlm,
-                        DLSN startDLSN,
-                        Optional<Long> startTransactionId,
-                        StatsLogger statsLogger) throws IOException {
-        this.bkdlm = bkdlm;
-        this.readHandler = bkdlm.createReadHandler(
-                Optional.<String>absent(),
-                this,
-                true);
-        this.maxReadAheadWaitTime = conf.getReadAheadWaitTime();
-        this.idleErrorThresholdMillis = conf.getReaderIdleErrorThresholdMillis();
-        this.shouldCheckIdleReader = idleErrorThresholdMillis > 0 && idleErrorThresholdMillis < Integer.MAX_VALUE;
-        this.startTransactionId = startTransactionId;
-
-        // start readahead
-        startReadAhead(startDLSN);
-        if (!startTransactionId.isPresent()) {
-            positioned = true;
-        }
-
-        // Stats
-        StatsLogger syncReaderStatsLogger = statsLogger.scope("sync_reader");
-        idleReaderError = syncReaderStatsLogger.getCounter("idle_reader_error");
-    }
-
-    private void startReadAhead(DLSN startDLSN) throws IOException {
-        readAheadReader = new ReadAheadEntryReader(
-                    bkdlm.getStreamName(),
-                    startDLSN,
-                    bkdlm.getConf(),
-                    readHandler,
-                    bkdlm.getReaderEntryStore(),
-                    bkdlm.getScheduler(),
-                    Ticker.systemTicker(),
-                    bkdlm.alertStatsLogger);
-        readHandler.registerListener(readAheadReader);
-        readHandler.asyncStartFetchLogSegments()
-                .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
-                    @Override
-                    public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
-                        readAheadReader.addStateChangeNotification(BKSyncLogReaderDLSN.this);
-                        readAheadReader.start(logSegments.getValue());
-                        return BoxedUnit.UNIT;
-                    }
-                });
-    }
-
-    @VisibleForTesting
-    ReadAheadEntryReader getReadAheadReader() {
-        return readAheadReader;
-    }
-
-    @VisibleForTesting
-    BKLogReadHandler getReadHandler() {
-        return readHandler;
-    }
-
-    private Entry.Reader readNextEntry(boolean nonBlocking) throws IOException {
-        Entry.Reader entry = null;
-        if (nonBlocking) {
-            return readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
-        } else {
-            while (!readAheadReader.isReadAheadCaughtUp()
-                    && null == readerException.get()
-                    && null == entry) {
-                entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
-            }
-            if (null != entry) {
-                return entry;
-            }
-            // reader is caught up
-            if (readAheadReader.isReadAheadCaughtUp()
-                    && null == readerException.get()) {
-                entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
-            }
-            return entry;
-        }
-    }
-
-    private void markReaderAsIdle() throws IdleReaderException {
-        idleReaderError.inc();
-        IdleReaderException ire = new IdleReaderException("Sync reader on stream "
-                + readHandler.getFullyQualifiedName()
-                + " is idle for more than " + idleErrorThresholdMillis + " ms");
-        readerException.compareAndSet(null, ire);
-        throw ire;
-    }
-
-    @Override
-    public synchronized LogRecordWithDLSN readNext(boolean nonBlocking)
-            throws IOException {
-        if (null != readerException.get()) {
-            throw readerException.get();
-        }
-        LogRecordWithDLSN record = doReadNext(nonBlocking);
-        // no record is returned, check if the reader becomes idle
-        if (null == record && shouldCheckIdleReader) {
-            if (readAheadReader.getNumCachedEntries() <= 0 &&
-                    readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) {
-                markReaderAsIdle();
-            }
-        }
-        return record;
-    }
-
-    private LogRecordWithDLSN doReadNext(boolean nonBlocking) throws IOException {
-        LogRecordWithDLSN record = null;
-
-        do {
-            // fetch one record until we don't find any entry available in the readahead cache
-            while (null == record) {
-                if (null == currentEntry) {
-                    currentEntry = readNextEntry(nonBlocking);
-                    if (null == currentEntry) {
-                        return null;
-                    }
-                }
-                record = currentEntry.nextRecord();
-                if (null == record) {
-                    currentEntry = null;
-                }
-            }
-
-            // check if we reached the end of stream
-            if (record.isEndOfStream()) {
-                EndOfStreamException eos = new EndOfStreamException("End of Stream Reached for "
-                        + readHandler.getFullyQualifiedName());
-                readerException.compareAndSet(null, eos);
-                throw eos;
-            }
-            // skip control records
-            if (record.isControl()) {
-                record = null;
-                continue;
-            }
-            if (!positioned) {
-                if (record.getTransactionId() < startTransactionId.get()) {
-                    record = null;
-                    continue;
-                } else {
-                    positioned = true;
-                    break;
-                }
-            } else {
-                break;
-            }
-        } while (true);
-        return record;
-    }
-
-    @Override
-    public synchronized List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords)
-            throws IOException {
-        LinkedList<LogRecordWithDLSN> retList =
-                new LinkedList<LogRecordWithDLSN>();
-
-        int numRead = 0;
-        LogRecordWithDLSN record = readNext(nonBlocking);
-        while ((null != record)) {
-            retList.add(record);
-            numRead++;
-            if (numRead >= numLogRecords) {
-                break;
-            }
-            record = readNext(nonBlocking);
-        }
-        return retList;
-    }
-
-    @Override
-    public Future<Void> asyncClose() {
-        Promise<Void> closePromise;
-        synchronized (this) {
-            if (null != closeFuture) {
-                return closeFuture;
-            }
-            closeFuture = closePromise = new Promise<Void>();
-        }
-        readHandler.unregisterListener(readAheadReader);
-        readAheadReader.removeStateChangeNotification(this);
-        Utils.closeSequence(bkdlm.getScheduler(), true,
-                readAheadReader,
-                readHandler
-        ).proxyTo(closePromise);
-        return closePromise;
-    }
-
-    @Override
-    public void close() throws IOException {
-        FutureUtils.result(asyncClose());
-    }
-
-    //
-    // Notification From ReadHandler
-    //
-
-    @Override
-    public void notifyOnError(Throwable cause) {
-        if (cause instanceof IOException) {
-            readerException.compareAndSet(null, (IOException) cause);
-        } else {
-            readerException.compareAndSet(null, new IOException(cause));
-        }
-    }
-
-    @Override
-    public void notifyOnOperationComplete() {
-        // no-op
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/resources/findbugsExclude.xml b/distributedlog-core/src/main/resources/findbugsExclude.xml
index 684b827..c07fad9 100644
--- a/distributedlog-core/src/main/resources/findbugsExclude.xml
+++ b/distributedlog-core/src/main/resources/findbugsExclude.xml
@@ -33,7 +33,7 @@
     <Bug pattern="EI_EXPOSE_REP" />
   </Match>
   <Match>
-    <Class name="com.twitter.distributedlog.BKAsyncLogReaderDLSN" />
+    <Class name="com.twitter.distributedlog.BKAsyncLogReader" />
     <Method name="run" />
     <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER" />
   </Match>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
index c34eb6e..3a1ab88 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
@@ -46,15 +46,15 @@ class NonBlockingReadsTestUtil {
                                 boolean forceStall,
                                 long segmentSize,
                                 boolean waitForIdle) throws Exception {
-        BKSyncLogReaderDLSN reader = null;
+        BKSyncLogReader reader = null;
         try {
-            reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1);
+            reader = (BKSyncLogReader) dlm.getInputStream(1);
         } catch (LogNotFoundException lnfe) {
         }
         while (null == reader) {
             TimeUnit.MILLISECONDS.sleep(20);
             try {
-                reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1);
+                reader = (BKSyncLogReader) dlm.getInputStream(1);
             } catch (LogNotFoundException lnfe) {
             } catch (LogEmptyException lee) {
             } catch (IOException ioe) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
index 9927616..a6cffbb 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
@@ -75,7 +75,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase {
         writer.closeAndComplete();
 
         Future<AsyncLogReader> futureReader1 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
-        BKAsyncLogReaderDLSN reader1 = (BKAsyncLogReaderDLSN) Await.result(futureReader1);
+        BKAsyncLogReader reader1 = (BKAsyncLogReader) Await.result(futureReader1);
         LogRecordWithDLSN record = Await.result(reader1.readNext());
         assertEquals(1L, record.getTransactionId());
         assertEquals(0L, record.getSequenceId());
@@ -542,7 +542,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase {
             writer.closeAndComplete();
         }
 
-        BKAsyncLogReaderDLSN reader0 = (BKAsyncLogReaderDLSN) Await.result(dlm.getAsyncLogReaderWithLock(subscriberId));
+        BKAsyncLogReader reader0 = (BKAsyncLogReader) Await.result(dlm.getAsyncLogReaderWithLock(subscriberId));
         assertEquals(DLSN.NonInclusiveLowerBound, reader0.getStartDLSN());
         long numTxns = 0;
         LogRecordWithDLSN record = Await.result(reader0.readNext());
@@ -562,7 +562,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase {
 
         SubscriptionsStore subscriptionsStore = dlm.getSubscriptionsStore();
         Await.result(subscriptionsStore.advanceCommitPosition(subscriberId, readDLSN));
-        BKAsyncLogReaderDLSN reader1 = (BKAsyncLogReaderDLSN) Await.result(dlm.getAsyncLogReaderWithLock(subscriberId));
+        BKAsyncLogReader reader1 = (BKAsyncLogReader) Await.result(dlm.getAsyncLogReaderWithLock(subscriberId));
         assertEquals(readDLSN, reader1.getStartDLSN());
         numTxns = 0;
         long startTxID =  10L;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index 95d760e..65507ac 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -1397,7 +1397,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             }, 0, TimeUnit.MILLISECONDS);
 
         latch.await();
-        BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN);
+        BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
         if (simulateReaderStall) {
             reader.disableProcessingReadRequests();
         }
@@ -1538,7 +1538,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
                 }, 0, TimeUnit.MILLISECONDS);
 
         latch.await();
-        BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN)dlm.getAsyncLogReader(DLSN.InitialDLSN);
+        BKAsyncLogReader reader = (BKAsyncLogReader)dlm.getAsyncLogReader(DLSN.InitialDLSN);
         reader.disableReadAheadLogSegmentsNotification();
         boolean exceptionEncountered = false;
         int recordCount = 0;
@@ -1616,7 +1616,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         record.setControl();
         Await.result(writer.write(record));
 
-        BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN);
+        BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
         record = Await.result(reader.readNext());
         LOG.info("Read record {}", record);
         assertEquals(1L, record.getTransactionId());
@@ -1668,7 +1668,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         } catch (EndOfStreamException ex) {
         }
 
-        BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN);
+        BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
         LogRecord record = null;
         for (int j = 0; j < NUM_RECORDS; j++) {
             record = Await.result(reader.readNext());
@@ -1702,7 +1702,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         }
         writer.close();
 
-        BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN);
+        BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
         try {
             LogRecord record = Await.result(reader.readNext());
             fail("Should have thrown");
@@ -1727,7 +1727,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         controlRecord.setControl();
         FutureUtils.result(writer.write(controlRecord));
 
-        BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN);
+        BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
         Future<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
         Future<LogRecordWithDLSN> readFuture = reader.readNext();
 
@@ -1772,7 +1772,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         controlRecord.setControl();
         FutureUtils.result(writer.write(controlRecord));
 
-        BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN);
+        BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
         Future<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, 0, TimeUnit.MILLISECONDS);
         Future<LogRecordWithDLSN> readFuture = reader.readNext();
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
index 54177c8..d28b62c 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
@@ -159,7 +159,7 @@ public class TestBKSyncLogReader extends TestDistributedLogBase {
 
         // all 10 records are added to the stream
         // then open a reader to read
-        BKSyncLogReaderDLSN reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1L);
+        BKSyncLogReader reader = (BKSyncLogReader) dlm.getInputStream(1L);
 
         // wait until readahead caught up
         while (!reader.getReadAheadReader().isReadAheadCaughtUp()) {
@@ -226,7 +226,7 @@ public class TestBKSyncLogReader extends TestDistributedLogBase {
         logger.info("Write first 10 records");
 
         // open a reader to read
-        BKSyncLogReaderDLSN reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1L);
+        BKSyncLogReader reader = (BKSyncLogReader) dlm.getInputStream(1L);
         // resume reading from sync reader. so it should be able to read all 10 records
         // and return null to claim it as caughtup
         LogRecord record = reader.readNext(false);
@@ -283,7 +283,7 @@ public class TestBKSyncLogReader extends TestDistributedLogBase {
         }, 0, 400, TimeUnit.MILLISECONDS);
 
         // open a reader to read
-        BKSyncLogReaderDLSN reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1L);
+        BKSyncLogReader reader = (BKSyncLogReader) dlm.getInputStream(1L);
         // resume reading from sync reader. so it should be able to read all 10 records
         // and return null to claim it as caughtup
         LogRecord record = reader.readNext(false);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
index 3f47337..bfa9156 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
@@ -99,7 +99,7 @@ public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase {
 
         DistributedLogManager dlmread = createNewDLM(conf, name);
 
-        BKSyncLogReaderDLSN reader0 = (BKSyncLogReaderDLSN) dlmread.getInputStream(0);
+        BKSyncLogReader reader0 = (BKSyncLogReader) dlmread.getInputStream(0);
 
         try {
             ReaderThread[] readerThreads = new ReaderThread[1];

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java
index 1739b47..6040549 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java
@@ -118,7 +118,7 @@ public class TestReader implements FutureEventListener<LogRecordWithDLSN> {
                 try {
                     AsyncLogReader reader = dlm.getAsyncLogReader(dlsn);
                     if (simulateErrors) {
-                        ((BKAsyncLogReaderDLSN) reader).simulateErrors();
+                        ((BKAsyncLogReader) reader).simulateErrors();
                     }
                     nextDLSN = dlsn;
                     LOG.info("Positioned reader {} at {}", readerName, dlsn);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
index ee53362..99ef041 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
@@ -312,7 +312,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
     }
 
     private void checkAndWaitWriterReaderPosition(BKLogSegmentWriter writer, long expectedWriterPosition,
-                                                  BKAsyncLogReaderDLSN reader, long expectedReaderPosition,
+                                                  BKAsyncLogReader reader, long expectedReaderPosition,
                                                   LedgerHandle inspector, long expectedLac) throws Exception {
         while (getLedgerHandle(writer).getLastAddConfirmed() < expectedWriterPosition) {
             Thread.sleep(1000);
@@ -357,7 +357,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
         }
 
         BKDistributedLogManager readDLM = (BKDistributedLogManager) createNewDLM(confLocal, name);
-        final BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) readDLM.getAsyncLogReader(DLSN.InitialDLSN);
+        final BKAsyncLogReader reader = (BKAsyncLogReader) readDLM.getAsyncLogReader(DLSN.InitialDLSN);
 
         // 2) reader should be able to read 5 entries.
         for (long i = 1; i <= numEntries; i++) {