You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/12/04 18:09:02 UTC

[1/2] git commit: Fixing bug where the DataByteArrayInputStream was not throwing EOFExceptions when reads were going past the end of the byte array.

Updated Branches:
  refs/heads/trunk 69e35d6c4 -> 02ef9445d


Fixing bug where the DataByteArrayInputStream was not throwing EOFExceptions when reads were going past the end of the byte array.

This would make it harder to figure out when things went wrong.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/02ef9445
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/02ef9445
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/02ef9445

Branch: refs/heads/trunk
Commit: 02ef9445dddcffc5f701dcb43ef0fa3d6acbc307
Parents: 8378cb1
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Wed Dec 4 12:08:28 2013 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Wed Dec 4 12:09:47 2013 -0500

----------------------------------------------------------------------
 .../activemq/command/XATransactionId.java       |  4 +-
 .../activemq/util/DataByteArrayInputStream.java | 58 +++++++++++---------
 2 files changed, 35 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/02ef9445/activemq-client/src/main/java/org/apache/activemq/command/XATransactionId.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/XATransactionId.java b/activemq-client/src/main/java/org/apache/activemq/command/XATransactionId.java
index e963c87..5f786e5 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/XATransactionId.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/XATransactionId.java
@@ -50,7 +50,7 @@ public class XATransactionId extends TransactionId implements Xid, Comparable {
         this.branchQualifier = xid.getBranchQualifier();
     }
 
-    public XATransactionId(byte[] encodedBytes) {
+    public XATransactionId(byte[] encodedBytes) throws IOException {
         encodedXidBytes = encodedBytes;
         initFromEncodedBytes();
     }
@@ -61,7 +61,7 @@ public class XATransactionId extends TransactionId implements Xid, Comparable {
 
     final int XID_PREFIX_SIZE = 16;
     //+|-,(long)lastAck,(byte)priority,(int)formatid,(short)globalLength....
-    private void initFromEncodedBytes() {
+    private void initFromEncodedBytes() throws IOException {
         DataByteArrayInputStream inputStream = new DataByteArrayInputStream(encodedXidBytes);
         inputStream.skipBytes(10);
         formatId = inputStream.readInt();

http://git-wip-us.apache.org/repos/asf/activemq/blob/02ef9445/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java b/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
index 9bf229d..0a13aa0 100755
--- a/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
@@ -16,10 +16,7 @@
  */
 package org.apache.activemq.util;
 
-import java.io.DataInput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.UTFDataFormatException;
+import java.io.*;
 
 /**
  * Optimized ByteArrayInputStream that can be used more than once
@@ -124,6 +121,14 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
         return (pos < buf.length) ? (buf[pos++] & 0xff) : -1;
     }
 
+    public int readOrIOException() throws IOException {
+        int rc = read();
+        if( rc == -1 ) {
+            throw new EOFException();
+        }
+        return rc;
+    }
+
     /**
      * Reads up to <code>len</code> bytes of data into an array of bytes from
      * this input stream.
@@ -180,45 +185,48 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
         return n;
     }
 
-    public boolean readBoolean() {
-        return read() != 0;
+    public boolean readBoolean() throws IOException {
+        return readOrIOException() != 0;
     }
 
-    public byte readByte() {
-        return (byte)read();
+    public byte readByte() throws IOException {
+        return (byte)readOrIOException();
     }
 
-    public int readUnsignedByte() {
-        return read();
+    public int readUnsignedByte() throws IOException {
+        return readOrIOException();
     }
 
-    public short readShort() {
-        int ch1 = read();
-        int ch2 = read();
+    public short readShort() throws IOException {
+        int ch1 = readOrIOException();
+        int ch2 = readOrIOException();
         return (short)((ch1 << 8) + (ch2 << 0));
     }
 
-    public int readUnsignedShort() {
-        int ch1 = read();
-        int ch2 = read();
+    public int readUnsignedShort() throws IOException {
+        int ch1 = readOrIOException();
+        int ch2 = readOrIOException();
         return (ch1 << 8) + (ch2 << 0);
     }
 
-    public char readChar() {
-        int ch1 = read();
-        int ch2 = read();
+    public char readChar() throws IOException {
+        int ch1 = readOrIOException();
+        int ch2 = readOrIOException();
         return (char)((ch1 << 8) + (ch2 << 0));
     }
 
-    public int readInt() {
-        int ch1 = read();
-        int ch2 = read();
-        int ch3 = read();
-        int ch4 = read();
+    public int readInt() throws IOException {
+        int ch1 = readOrIOException();
+        int ch2 = readOrIOException();
+        int ch3 = readOrIOException();
+        int ch4 = readOrIOException();
         return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0);
     }
 
-    public long readLong() {
+    public long readLong() throws IOException {
+        if (pos >= buf.length ) {
+            throw new EOFException();
+        }
         long rc = ((long)buf[pos++] << 56) + ((long)(buf[pos++] & 255) << 48) + ((long)(buf[pos++] & 255) << 40) + ((long)(buf[pos++] & 255) << 32);
         return rc + ((long)(buf[pos++] & 255) << 24) + ((buf[pos++] & 255) << 16) + ((buf[pos++] & 255) << 8) + ((buf[pos++] & 255) << 0);
     }


[2/2] git commit: Fixing https://issues.apache.org/jira/browse/AMQ-4917 : LevelDB store can fail when using durable subs.

Posted by ch...@apache.org.
Fixing https://issues.apache.org/jira/browse/AMQ-4917 : LevelDB store can fail when using durable subs.

We were browsing durable sub entries which had been concurrently GCed causing leveldb store failures which then caused the broker to restart.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8378cb1f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8378cb1f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8378cb1f

Branch: refs/heads/trunk
Commit: 8378cb1ffc77715d3c9c8a173805f037afa2935a
Parents: 69e35d6
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Tue Dec 3 19:20:11 2013 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Wed Dec 4 12:09:47 2013 -0500

----------------------------------------------------------------------
 .../org/apache/activemq/leveldb/DBManager.scala |  12 +-
 .../apache/activemq/leveldb/LevelDBClient.scala | 117 +++++++++----------
 .../apache/activemq/leveldb/LevelDBStore.scala  |  10 +-
 .../org/apache/activemq/leveldb/RecordLog.scala |  61 +++++-----
 4 files changed, 104 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8378cb1f/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index 6b575ee..722d932 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -37,7 +37,9 @@ import org.apache.activemq.ActiveMQMessageAuditNoSync
 import org.fusesource.hawtdispatch
 
 case class EntryLocator(qid:Long, seq:Long)
-case class DataLocator(store:LevelDBStore, pos:Long, len:Int)
+case class DataLocator(store:LevelDBStore, pos:Long, len:Int) {
+  override def toString: String = "DataLocator(%x, %d)".format(pos, len)
+}
 case class MessageRecord(store:LevelDBStore, id:MessageId, data:Buffer, syncNeeded:Boolean) {
   var locator:DataLocator = _
 }
@@ -860,8 +862,12 @@ class DBManager(val parent:LevelDBStore) {
   def getMessage(x: MessageId):Message = {
     val id = Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x)
     val locator = id.getDataLocator()
-    val msg = client.getMessageWithRetry(locator)
-    msg.setMessageId(id)
+    val msg = client.getMessage(locator)
+    if( msg!=null ) {
+      msg.setMessageId(id)
+    } else {
+      LevelDBStore.warn("Could not load messages for: "+x+" at: "+locator)
+    }
     msg
   }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/8378cb1f/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index fe29012..7872ff8 100755
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -21,7 +21,6 @@ import java.{lang=>jl}
 import java.{util=>ju}
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
-import java.util.concurrent.atomic.AtomicBoolean
 import collection.immutable.TreeMap
 import collection.mutable.{HashMap, ListBuffer}
 import org.iq80.leveldb._
@@ -37,20 +36,13 @@ import org.apache.activemq.command.{MessageAck, Message}
 import org.apache.activemq.util.{IOExceptionSupport, ByteSequence}
 import java.text.SimpleDateFormat
 import java.util.{Date, Collections}
-import org.apache.activemq.leveldb.util.TimeMetric
-import org.apache.activemq.leveldb.RecordLog.LogInfo
 import org.fusesource.leveldbjni.internal.JniDB
 import org.apache.activemq.ActiveMQMessageAuditNoSync
-import java.util.zip.CRC32
 import org.apache.activemq.leveldb.util.TimeMetric
 import org.fusesource.hawtbuf.ByteArrayInputStream
 import org.apache.activemq.leveldb.RecordLog.LogInfo
 import scala.Some
 import scala.Serializable
-import org.apache.activemq.leveldb.XaAckRecord
-import org.apache.activemq.leveldb.MessageRecord
-import org.apache.activemq.leveldb.EntryLocator
-import org.apache.activemq.leveldb.DataLocator
 import org.fusesource.hawtbuf.ByteArrayOutputStream
 import org.apache.activemq.broker.SuppressReplyException
 
@@ -772,19 +764,20 @@ class LevelDBClient(store: LevelDBStore) {
 
                     val index_record = new EntryRecord.Bean()
                     index_record.setValueLocation(record.getValueLocation)
-                    index_record.setValueLength(record.getValueLength)
-                    val    index_value = encodeEntryRecord(index_record.freeze()).toByteArray
+                    if( record.hasValueLength ) {
+                      index_record.setValueLength(record.getValueLength)
+                    }
+                    val index_value = encodeEntryRecord(index_record.freeze()).toByteArray
 
                     replay_write_batch.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value)
 
                     if( kind==LOG_ADD_ENTRY ) {
-                      if ( record.hasValueLocation ) {
-                        logRefIncrement(record.getValueLocation)
-                      }
+                      logRefIncrement(record.getValueLocation)
                       collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray)
+                      trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", indexRecoveryPosition, record.getCollectionKey, record.getEntryKey)
+                    } else {
+                      trace("Replay of LOG_UPDATE_ENTRY at %d, collection: %s, entry: %s", indexRecoveryPosition, record.getCollectionKey, record.getEntryKey)
                     }
-                    trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", indexRecoveryPosition, record.getCollectionKey, record.getEntryKey)
-
 
                   case LOG_REMOVE_ENTRY =>
                     val record = decodeEntryRecord(data)
@@ -834,10 +827,9 @@ class LevelDBClient(store: LevelDBStore) {
 
   private def logRefDecrement(pos: Long) {
     for( key <- logRefKey(pos) ) {
-      logRefs.get(key).foreach { counter =>
-        if (counter.decrementAndGet() == 0) {
-          logRefs.remove(key)
-        }
+      logRefs.get(key) match {
+        case Some(counter) => counter.decrementAndGet() == 0
+        case None => warn("invalid: logRefDecrement: "+pos)
       }
     }
   }
@@ -1252,11 +1244,16 @@ class LevelDBClient(store: LevelDBStore) {
     collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
       val seq = decodeLong(key)
       var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
-      val msg = getMessageWithRetry(locator)
-      msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
-      msg.getMessageId().setDataLocator(locator)
-      msg.setRedeliveryCounter(decodeQueueEntryMeta(value))
-      func(msg)
+      val msg = getMessage(locator)
+      if( msg !=null ) {
+        msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
+        msg.getMessageId().setDataLocator(locator)
+        msg.setRedeliveryCounter(decodeQueueEntryMeta(value))
+        func(msg)
+      } else {
+        warn("Could not load message seq: "+seq+" from "+locator)
+        true
+      }
     }
   }
 
@@ -1278,10 +1275,15 @@ class LevelDBClient(store: LevelDBStore) {
         func(XaAckRecord(collectionKey, seq, ack, sub))
       } else {
         var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
-        val msg = getMessageWithRetry(locator)
-        msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
-        msg.getMessageId().setDataLocator(locator)
-        func(msg)
+        val msg = getMessage(locator)
+        if( msg !=null ) {
+          msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
+          msg.getMessageId().setDataLocator(locator)
+          func(msg)
+        } else {
+          warn("Could not load XA message seq: "+seq+" from "+locator)
+          true
+        }
       }
     }
   }
@@ -1295,22 +1297,6 @@ class LevelDBClient(store: LevelDBStore) {
     }
   }
 
-  def getMessageWithRetry(locator:AnyRef):Message = {
-    var retry = 0
-    var rc = getMessage(locator);
-    while( rc == null ) {
-      if( retry > 10 )
-        return null;
-      Thread.sleep(retry*10)
-      rc = getMessage(locator);
-      retry+=1
-    }
-    if( retry > 0 ) {
-      info("Recovered from 'failed getMessage' on retry: "+retry)
-    }
-    rc
-  }
-  
   def getMessage(locator:AnyRef):Message = {
     assert(locator!=null)
     val buffer = locator match {
@@ -1487,9 +1473,7 @@ class LevelDBClient(store: LevelDBStore) {
           batch.put(key, index_data)
 
           if( kind==LOG_ADD_ENTRY ) {
-            for (key <- logRefKey(dataLocator.pos, log_info)) {
-              logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
-            }
+            logRefIncrement(dataLocator.pos)
             collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
           }
 
@@ -1537,11 +1521,11 @@ class LevelDBClient(store: LevelDBStore) {
         log_record.setCollectionKey(entry.subKey)
         log_record.setEntryKey(ACK_POSITION)
         log_record.setValueLocation(entry.ackPosition)
-        appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
+        appender.append(LOG_UPDATE_ENTRY, encodeEntryRecord(log_record.freeze()))
 
         val index_record = new EntryRecord.Bean()
         index_record.setValueLocation(entry.ackPosition)
-        batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
+        batch.put(key, encodeEntryRecord(index_record.freeze()).toByteArray)
       }
 
       if (uow.syncNeeded) {
@@ -1625,17 +1609,6 @@ class LevelDBClient(store: LevelDBStore) {
 
   def gc(topicPositions:Seq[(Long, Long)]):Unit = {
 
-    detect_if_compact_needed
-
-    // Lets compact the leveldb index if it looks like we need to.
-    if( index.compact_needed ) {
-      debug("Compacting the leveldb index at: %s", dirtyIndexFile)
-      val start = System.nanoTime()
-      index.compact
-      val duration = System.nanoTime() - start;
-      info("Compacted the leveldb index at: %s in %.2f ms", dirtyIndexFile, (duration / 1000000.0))
-    }
-
     // Delete message refs for topics who's consumers have advanced..
     if( !topicPositions.isEmpty ) {
       might_fail_using_index {
@@ -1646,6 +1619,7 @@ class LevelDBClient(store: LevelDBStore) {
             ro.verifyChecksums(verifyChecksums)
             val start = encodeEntryKey(ENTRY_PREFIX, topic, 0)
             val end =  encodeEntryKey(ENTRY_PREFIX, topic, first)
+            debug("Topic: %d GC to seq: %d", topic, first)
             index.cursorRange(start, end, ro) { case (key, value) =>
               val entry = EntryRecord.FACTORY.parseUnframed(value)
               batch.delete(key)
@@ -1657,8 +1631,30 @@ class LevelDBClient(store: LevelDBStore) {
       }
     }
 
+    detect_if_compact_needed
+
+    // Lets compact the leveldb index if it looks like we need to.
+    if( index.compact_needed ) {
+      val start = System.nanoTime()
+      index.compact
+      val duration = System.nanoTime() - start;
+      info("Compacted the leveldb index at: %s in %.2f ms", dirtyIndexFile, (duration / 1000000.0))
+    }
+
     import collection.JavaConversions._
     lastIndexSnapshotPos
+
+    // drop the logs that are no longer referenced.
+    for( (x,y) <- logRefs.toSeq ) {
+      if( y.get() <= 0 ) {
+        if( y.get() < 0 ) {
+          warn("Found a negative log reference for log: "+x)
+        }
+        debug("Log no longer referenced: %x", x)
+        logRefs.remove(x)
+      }
+    }
+
     val emptyJournals = log.log_infos.keySet.toSet -- logRefs.keySet
 
     // We don't want to delete any journals that the index has not snapshot'ed or
@@ -1669,6 +1665,7 @@ class LevelDBClient(store: LevelDBStore) {
 
     emptyJournals.foreach { id =>
       if ( id < deleteLimit ) {
+        debug("Deleting log at %x", id)
         log.delete(id)
       }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8378cb1f/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 43238a9..7b90b0c 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -669,6 +669,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
 
     lastSeq.set(db.getLastQueueEntrySeq(key))
 
+    def cursorResetPosition = 0L
+
     def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
       check_running
       val seq = lastSeq.incrementAndGet()
@@ -731,7 +733,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
     def removeAllMessages(context: ConnectionContext): Unit = {
       check_running
       db.collectionEmpty(key)
-      cursorPosition = 0
+      cursorPosition = cursorResetPosition
     }
 
     def getMessageCount: Int = {
@@ -744,11 +746,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
 
     def recover(listener: MessageRecoveryListener): Unit = {
       check_running
-      cursorPosition = db.cursorMessages(preparedAcks, key, listener, 0)
+      cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorResetPosition)
     }
 
     def resetBatching: Unit = {
-      cursorPosition = 0
+      cursorPosition = cursorResetPosition
     }
 
     def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
@@ -789,6 +791,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
     val subscriptions = collection.mutable.HashMap[(String, String), DurableSubscription]()
     var firstSeq = 0L
 
+    override def cursorResetPosition = firstSeq
+
     def subscription_with_key(key:Long) = subscriptions.find(_._2.subKey == key).map(_._2)
 
     override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {

http://git-wip-us.apache.org/repos/asf/activemq/blob/8378cb1f/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
index 28e1be1..08f18a7 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
@@ -517,42 +517,43 @@ case class RecordLog(directory: File, logSuffix:String) {
     log_infos.map(_._2.position).toArray
   }
 
-  private def get_reader[T](record_position:Long)(func: (LogReader)=>T) = {
-
-    val lookup = log_mutex.synchronized {
-      val info = log_info(record_position)
-      info.map { info=>
-        if(info.position == current_appender.position) {
-          current_appender.retain()
-          (info, current_appender)
-        } else {
-          (info, null)
-        }
+  private def get_reader[T](record_position:Long)(func: (LogReader)=>T):Option[T] = {
+
+    val (info, appender) = log_mutex.synchronized {
+      log_info(record_position) match {
+        case None =>
+          warn("No reader available for position: %x, log_infos: %s", record_position, log_infos)
+          return None
+        case Some(info) =>
+          if(info.position == current_appender.position) {
+            current_appender.retain()
+            (info, current_appender)
+          } else {
+            (info, null)
+          }
       }
     }
 
-    lookup.map { case (info, appender) =>
-      val reader = if( appender!=null ) {
-        // read from the current appender.
-        appender
-      } else {
-        // Checkout a reader from the cache...
-        reader_cache.synchronized {
-          var reader = reader_cache.get(info.file)
-          if(reader==null) {
-            reader = LogReader(info.file, info.position)
-            reader_cache.put(info.file, reader)
-          }
-          reader.retain()
-          reader
+    val reader = if( appender!=null ) {
+      // read from the current appender.
+      appender
+    } else {
+      // Checkout a reader from the cache...
+      reader_cache.synchronized {
+        var reader = reader_cache.get(info.file)
+        if(reader==null) {
+          reader = LogReader(info.file, info.position)
+          reader_cache.put(info.file, reader)
         }
+        reader.retain()
+        reader
       }
+    }
 
-      try {
-        func(reader)
-      } finally {
-        reader.release
-      }
+    try {
+      Some(func(reader))
+    } finally {
+      reader.release
     }
   }