You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/28 01:05:25 UTC

[17/20] incubator-distributedlog git commit: DL-111: ReadAhead Cache should cache entries rather than records

DL-111: ReadAhead Cache should cache entries rather than records

Current readahead cache cache records. So it will be a lot of callbacks (function calls) when polling a record off the readahead cache. Most of the cpu cycles are unnecessarily spent
on function calls on polling records off the readahead cache. It is the throughput bottleneck for a DL reader.

This change is to change ReadAhead cache to cache entries rather than records. Defer the deserilization of records later on when the reader wants to access the records. It also make
the cache more efficient to reduce the memory footprint.


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/b5d44ccb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/b5d44ccb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/b5d44ccb

Branch: refs/heads/master
Commit: b5d44ccb6aa0ec633ff7fe5a870fa607ea71d92d
Parents: 6e507a3
Author: Sijie Guo <si...@twitter.com>
Authored: Mon Nov 21 17:30:13 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Tue Dec 27 16:49:29 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKAsyncLogReaderDLSN.java    |  56 ++++++++-
 .../distributedlog/BKLogReadHandler.java        |   7 +-
 .../distributedlog/BKSyncLogReaderDLSN.java     |  12 +-
 .../twitter/distributedlog/ReadAheadCache.java  | 122 +++----------------
 .../readahead/ReadAheadTracker.java             |   2 +-
 .../readahead/ReadAheadWorker.java              |   3 -
 .../distributedlog/TestAsyncReaderWriter.java   |   8 +-
 7 files changed, 78 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b5d44ccb/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
index 90230ae..e11d7a3 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
@@ -118,6 +118,10 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
         }
     };
 
+    // State
+    private Entry.Reader currentEntry = null;
+    private LogRecordWithDLSN nextRecord = null;
+
     // Failure Injector
     private final AsyncFailureInjector failureInjector;
     private boolean disableProcessingReadRequests = false;
@@ -281,8 +285,14 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
                     //     that means notification was missed between readahead and reader.
                     //   - cache is empty and readahead is idle (no records added for a long time)
                     idleReaderCheckIdleReadAheadCount.inc();
-                    if (cache.getNumCachedRecords() <= 0
-                            && !cache.isReadAheadIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) {
+                    try {
+                        if (!hasMoreRecords(cache)
+                                && !cache.isReadAheadIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) {
+                            return;
+                        }
+                    } catch (IOException e) {
+                        // we encountered exceptions on checking more records
+                        setLastException(e);
                         return;
                     }
 
@@ -488,6 +498,46 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
         pendingRequests.clear();
     }
 
+    boolean hasMoreRecords() throws IOException {
+        return hasMoreRecords(bkLedgerManager.readAheadCache);
+    }
+
+    private synchronized boolean hasMoreRecords(ReadAheadCache cache) throws IOException {
+        if (cache.getNumCachedEntries() > 0 || null != nextRecord) {
+            return true;
+        } else if (null != currentEntry) {
+            nextRecord = currentEntry.nextRecord();
+            return null != nextRecord;
+        } else {
+            return false;
+        }
+    }
+
+    private synchronized LogRecordWithDLSN readNextRecord() throws IOException {
+        if (null == currentEntry) {
+            currentEntry = bkLedgerManager.getNextReadAheadEntry();
+            // no current entry after reading from read head then return null
+            if (null == currentEntry) {
+                return null;
+            }
+        }
+
+        LogRecordWithDLSN recordToReturn;
+        if (null == nextRecord) {
+            nextRecord = currentEntry.nextRecord();
+            // no more records in current entry
+            if (null == nextRecord) {
+                currentEntry = null;
+                return readNextRecord();
+            }
+        }
+
+        // found a record to return and prefetch the next one
+        recordToReturn = nextRecord;
+        nextRecord = currentEntry.nextRecord();
+        return recordToReturn;
+    }
+
     @Override
     public void run() {
         synchronized(scheduleLock) {
@@ -549,7 +599,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
                     while (!nextRequest.hasReadEnoughRecords()) {
                         // read single record
                         do {
-                            record = bkLedgerManager.getNextReadAheadRecord();
+                            record = readNextRecord();
                         } while (null != record && (record.isControl() || (record.getDlsn().compareTo(getStartDLSN()) < 0)));
                         if (null == record) {
                             break;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b5d44ccb/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
index a1e29a2..faa47fc 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
@@ -199,13 +199,10 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
                 .build();
         readAheadCache = new ReadAheadCache(
                 getFullyQualifiedName(),
-                handlerStatsLogger,
                 alertStatsLogger,
                 readerStateNotification,
                 dynConf.getReadAheadMaxRecords(),
                 deserializeRecordSet,
-                conf.getTraceReadAheadDeliveryLatency(),
-                conf.getDataLatencyWarnThresholdMillis(),
                 Ticker.systemTicker());
 
         this.subscriberId = subscriberId;
@@ -481,8 +478,8 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
         return promise;
     }
 
-    public LogRecordWithDLSN getNextReadAheadRecord() throws IOException {
-        return readAheadCache.getNextReadAheadRecord();
+    public Entry.Reader getNextReadAheadEntry() throws IOException {
+        return readAheadCache.getNextReadAheadEntry();
     }
 
     public ReadAheadCache getReadAheadCache() {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b5d44ccb/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
index 28e69b2..bce12b6 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
@@ -51,7 +51,6 @@ class BKSyncLogReaderDLSN implements LogReader, Runnable, FutureEventListener<Lo
     private Promise<Void> closeFuture;
     private final Optional<Long> startTransactionId;
     private final DLSN startDLSN;
-    private volatile DLSN lastSeenDLSN = DLSN.InvalidDLSN;
     // lock on variables that would be accessed by both background threads and foreground threads
     private final Object sharedLock = new Object();
 
@@ -110,7 +109,6 @@ class BKSyncLogReaderDLSN implements LogReader, Runnable, FutureEventListener<Lo
 
     @Override
     public void onSuccess(LogRecordWithDLSN record) {
-        this.lastSeenDLSN = record.getDlsn();
         if (!startTransactionId.isPresent() || record.getTransactionId() >= startTransactionId.get()) {
             readAheadRecords.add(record);
         }
@@ -167,15 +165,7 @@ class BKSyncLogReaderDLSN implements LogReader, Runnable, FutureEventListener<Lo
                     if (null != record) {
                         break;
                     }
-                    DLSN lastDLSNSeenByReadAhead =
-                            reader.bkLedgerManager.readAheadCache.getLastReadAheadUserDLSN();
-
-                    // if last seen DLSN by reader is same as the one seen by ReadAhead
-                    // that means that reader is caught up with ReadAhead and ReadAhead
-                    // is caught up with stream
-                    shallWait = DLSN.InitialDLSN != lastDLSNSeenByReadAhead
-                            && lastSeenDLSN.compareTo(lastDLSNSeenByReadAhead) < 0
-                            && startDLSN.compareTo(lastDLSNSeenByReadAhead) <= 0;
+                    shallWait = reader.hasMoreRecords();
                 }
             } catch (InterruptedException e) {
                 throw new DLInterruptedException("Interrupted on waiting next available log record for stream "

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b5d44ccb/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java
index 58933e5..d6051f6 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java
@@ -20,7 +20,6 @@ package com.twitter.distributedlog;
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Stopwatch;
@@ -30,8 +29,6 @@ import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException;
 import com.twitter.distributedlog.exceptions.LogReadException;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,11 +36,8 @@ public class ReadAheadCache {
     static final Logger LOG = LoggerFactory.getLogger(ReadAheadCache.class);
 
     private final String streamName;
-    private final LinkedBlockingQueue<LogRecordWithDLSN> readAheadRecords;
-    private final int maxCachedRecords;
-    private final AtomicReference<DLSN> minActiveDLSN = new AtomicReference<DLSN>(DLSN.NonInclusiveLowerBound);
-    private DLSN lastReadAheadDLSN = DLSN.InvalidDLSN;
-    private DLSN lastReadAheadUserDLSN = DLSN.InvalidDLSN;
+    private final LinkedBlockingQueue<Entry.Reader> readAheadEntries;
+    private final int maxCachedEntries;
     private final AtomicReference<IOException> lastException = new AtomicReference<IOException>();
     private final boolean deserializeRecordSet;
     // callbacks
@@ -53,53 +47,27 @@ public class ReadAheadCache {
     // variables for idle reader detection
     private final Stopwatch lastEntryProcessTime;
 
-    // Stats
-    private final AtomicLong cacheBytes = new AtomicLong(0);
-
     private final AlertStatsLogger alertStatsLogger;
-    private final StatsLogger statsLogger;
-    private final OpStatsLogger readAheadDeliveryLatencyStat;
-    private final OpStatsLogger negativeReadAheadDeliveryLatencyStat;
-    // Flags on controlling delivery latency stats collection
-    private final boolean traceDeliveryLatencyEnabled;
-    private volatile boolean suppressDeliveryLatency = true;
-    private final long deliveryLatencyWarnThresholdMillis;
 
     public ReadAheadCache(String streamName,
-                          StatsLogger statsLogger,
                           AlertStatsLogger alertStatsLogger,
                           AsyncNotification notification,
                           int maxCachedRecords,
                           boolean deserializeRecordSet,
-                          boolean traceDeliveryLatencyEnabled,
-                          long deliveryLatencyWarnThresholdMillis,
                           Ticker ticker) {
         this.streamName = streamName;
-        this.maxCachedRecords = maxCachedRecords;
+        this.maxCachedEntries = maxCachedRecords;
         this.notification = notification;
         this.deserializeRecordSet = deserializeRecordSet;
 
         // create the readahead queue
-        readAheadRecords = new LinkedBlockingQueue<LogRecordWithDLSN>();
+        readAheadEntries = new LinkedBlockingQueue<Entry.Reader>();
 
         // start the idle reader detection
         lastEntryProcessTime = Stopwatch.createStarted(ticker);
 
-        // Flags to control delivery latency tracing
-        this.traceDeliveryLatencyEnabled = traceDeliveryLatencyEnabled;
-        this.deliveryLatencyWarnThresholdMillis = deliveryLatencyWarnThresholdMillis;
         // Stats
-        StatsLogger readAheadStatsLogger = statsLogger.scope("readahead");
-        this.statsLogger = readAheadStatsLogger;
         this.alertStatsLogger = alertStatsLogger;
-        this.readAheadDeliveryLatencyStat =
-                readAheadStatsLogger.getOpStatsLogger("delivery_latency");
-        this.negativeReadAheadDeliveryLatencyStat =
-                readAheadStatsLogger.getOpStatsLogger("negative_delivery_latency");
-    }
-
-    DLSN getLastReadAheadUserDLSN() {
-        return lastReadAheadUserDLSN;
     }
 
     /**
@@ -133,26 +101,25 @@ public class ReadAheadCache {
     }
 
     /**
-     * Poll next record from the readahead queue.
+     * Poll next entry from the readahead queue.
      *
-     * @return next record from readahead queue. null if no records available in the queue.
+     * @return next entry from readahead queue. null if no entries available in the queue.
      * @throws IOException
      */
-    public LogRecordWithDLSN getNextReadAheadRecord() throws IOException {
+    public Entry.Reader getNextReadAheadEntry() throws IOException {
         if (null != lastException.get()) {
             throw lastException.get();
         }
 
-        LogRecordWithDLSN record = readAheadRecords.poll();
+        Entry.Reader entry = readAheadEntries.poll();
 
-        if (null != record) {
-            cacheBytes.addAndGet(-record.getPayload().length);
+        if (null != entry) {
             if (!isCacheFull()) {
                 invokeReadAheadCallback();
             }
         }
 
-        return record;
+        return entry;
     }
 
     /**
@@ -196,7 +163,7 @@ public class ReadAheadCache {
     }
 
     public boolean isCacheFull() {
-        return getNumCachedRecords() >= maxCachedRecords;
+        return getNumCachedEntries() >= maxCachedEntries;
     }
 
     /**
@@ -204,25 +171,8 @@ public class ReadAheadCache {
      *
      * @return number cached records.
      */
-    public int getNumCachedRecords() {
-        return readAheadRecords.size();
-    }
-
-    /**
-     * Return number cached bytes.
-     *
-     * @return number cached bytes.
-     */
-    public long getNumCachedBytes() {
-        return cacheBytes.get();
-    }
-
-    public void setSuppressDeliveryLatency(boolean suppressed) {
-        this.suppressDeliveryLatency = suppressed;
-    }
-
-    public void setMinActiveDLSN(DLSN minActiveDLSN) {
-        this.minActiveDLSN.set(minActiveDLSN);
+    public int getNumCachedEntries() {
+        return readAheadEntries.size();
     }
 
     /**
@@ -252,44 +202,7 @@ public class ReadAheadCache {
                     .deserializeRecordSet(deserializeRecordSet)
                     .setInputStream(ledgerEntry.getEntryInputStream())
                     .buildReader();
-            while(true) {
-                LogRecordWithDLSN record = reader.nextRecord();
-
-                if (null == record) {
-                    break;
-                }
-
-                if (lastReadAheadDLSN.compareTo(record.getDlsn()) >= 0) {
-                    LOG.error("Out of order reads last {} : curr {}", lastReadAheadDLSN, record.getDlsn());
-                    throw new LogReadException("Out of order reads");
-                }
-                lastReadAheadDLSN = record.getDlsn();
-
-                if (record.isControl()) {
-                    continue;
-                }
-                lastReadAheadUserDLSN = lastReadAheadDLSN;
-
-                if (minActiveDLSN.get().compareTo(record.getDlsn()) > 0) {
-                    continue;
-                }
-
-                if (traceDeliveryLatencyEnabled && !suppressDeliveryLatency) {
-                    long currentMs = System.currentTimeMillis();
-                    long deliveryMs = currentMs - record.getTransactionId();
-                    if (deliveryMs >= 0) {
-                        readAheadDeliveryLatencyStat.registerSuccessfulEvent(deliveryMs);
-                    } else {
-                        negativeReadAheadDeliveryLatencyStat.registerSuccessfulEvent(-deliveryMs);
-                    }
-                    if (deliveryMs > deliveryLatencyWarnThresholdMillis) {
-                        LOG.warn("Record {} for stream {} took long time to deliver : publish time = {}, available time = {}, delivery time = {}, reason = {}.",
-                                 new Object[] { record.getDlsn(), streamName, record.getTransactionId(), currentMs, deliveryMs, reason });
-                    }
-                }
-                readAheadRecords.add(record);
-                cacheBytes.addAndGet(record.getPayload().length);
-            }
+            readAheadEntries.add(reader);
         } catch (InvalidEnvelopedEntryException ieee) {
             alertStatsLogger.raise("Found invalid enveloped entry on stream {} : ", streamName, ieee);
             setLastException(ieee);
@@ -299,13 +212,12 @@ public class ReadAheadCache {
     }
 
     public void clear() {
-        readAheadRecords.clear();
-        cacheBytes.set(0L);
+        readAheadEntries.clear();
     }
 
     @Override
     public String toString() {
-        return String.format("%s: Cache Bytes: %d, Num Cached Records: %d",
-            streamName, cacheBytes.get(), getNumCachedRecords());
+        return String.format("%s: Num Cached Entries: %d",
+            streamName, getNumCachedEntries());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b5d44ccb/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java
index 39a627f..a58218b 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java
@@ -81,7 +81,7 @@ public class ReadAheadTracker {
 
             @Override
             public Number getSample() {
-                return cache.getNumCachedRecords();
+                return cache.getNumCachedEntries();
             }
         };
         this.statsLogger.registerGauge(cachEntriesGaugeLabel, cacheEntriesGauge);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b5d44ccb/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
index 83a34a3..9a1911e 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
@@ -692,7 +692,6 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea
                             long startBKEntry = 0;
                             if (l.isPartiallyTruncated() && !conf.getIgnoreTruncationStatus()) {
                                 startBKEntry = l.getMinActiveDLSN().getEntryId();
-                                readAheadCache.setMinActiveDLSN(l.getMinActiveDLSN());
                             }
 
                             if(l.getLogSegmentSequenceNumber() == nextReadAheadPosition.getLogSegmentSequenceNumber()) {
@@ -743,7 +742,6 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea
                     if (null == currentMetadata) {
                         if (isCatchingUp) {
                             isCatchingUp = false;
-                            readAheadCache.setSuppressDeliveryLatency(false);
                             if (isHandleForReading) {
                                 LOG.info("{} caught up at {}: position = {} and no log segment to position on at this point.",
                                          new Object[] { fullyQualifiedName, System.currentTimeMillis(), nextReadAheadPosition });
@@ -945,7 +943,6 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea
                         // the readahead is caught up if current ledger is in progress and read position moves over last add confirmed
                         if (isCatchingUp) {
                             isCatchingUp = false;
-                            readAheadCache.setSuppressDeliveryLatency(false);
                             if (isHandleForReading) {
                                 LOG.info("{} caught up at {}: lac = {}, position = {}.",
                                          new Object[] { fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition });

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b5d44ccb/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index e5063cc..28f7a74 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -1618,17 +1618,17 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         assertEquals(1L, record.getTransactionId());
 
         assertNotNull(reader.bkLedgerManager.readAheadWorker);
-        assertTrue(reader.bkLedgerManager.readAheadCache.getNumCachedRecords() <= maxAllowedCachedRecords);
+        assertTrue(reader.bkLedgerManager.readAheadCache.getNumCachedEntries() <= maxAllowedCachedRecords);
 
         for (int i = 2; i <= numRecords; i++) {
             record = Await.result(reader.readNext());
             LOG.info("Read record {}", record);
             assertEquals((long) i, record.getTransactionId());
             TimeUnit.MILLISECONDS.sleep(20);
-            int numCachedRecords = reader.bkLedgerManager.readAheadCache.getNumCachedRecords();
+            int numCachedEntries = reader.bkLedgerManager.readAheadCache.getNumCachedEntries();
             assertTrue("Should cache less than " + batchSize + " records but already found "
-                    + numCachedRecords + " records when reading " + i + "th record",
-                    numCachedRecords <= maxAllowedCachedRecords);
+                    + numCachedEntries + " records when reading " + i + "th record",
+                    numCachedEntries <= maxAllowedCachedRecords);
         }
     }