You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/11/04 15:26:13 UTC

[1/4] git commit: leveldb store: Add more log traces for when we need to get more details on what's going on.

Updated Branches:
  refs/heads/trunk 85fc68600 -> a907fc9e9


leveldb store: Add more log traces for when we need to get more details on what's going on.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a907fc9e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a907fc9e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a907fc9e

Branch: refs/heads/trunk
Commit: a907fc9e942db4cdb58e3cfb6b366aaa9497e00e
Parents: b1d8cbe
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Fri Nov 1 14:24:36 2013 -0400
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Mon Nov 4 09:26:01 2013 -0500

----------------------------------------------------------------------
 .../scala/org/apache/activemq/leveldb/LevelDBClient.scala    | 4 ++--
 .../main/scala/org/apache/activemq/leveldb/RecordLog.scala   | 2 +-
 .../activemq/leveldb/replicated/MasterLevelDBStore.scala     | 2 ++
 .../activemq/leveldb/replicated/SlaveLevelDBStore.scala      | 8 ++++++--
 activemq-leveldb-store/src/test/resources/log4j.properties   | 2 +-
 5 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a907fc9e/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index b130a22..1a0dd35 100755
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -602,7 +602,7 @@ class LevelDBClient(store: LevelDBStore) {
         Some(this.getClass.getClassLoader.loadClass(name).newInstance().asInstanceOf[DBFactory])
       } catch {
         case e:Throwable =>
-          debug(e, "Could not load factory: "+name+" due to: "+e)
+          debug("Could not load factory: "+name+" due to: "+e)
           None
       }
     }.headOption.getOrElse(throw new Exception("Could not load any of the index factory classes: "+factoryNames))
@@ -822,7 +822,7 @@ class LevelDBClient(store: LevelDBStore) {
           throw e;
       } finally {
         recoveryLogs = null
-        debug("Replay of journal done")
+        debug("Replay end")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a907fc9e/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
index e69b58a..28e1be1 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
@@ -313,7 +313,7 @@ case class RecordLog(directory: File, logSuffix:String) {
       val prefix = is.readByte()
       if( prefix != LOG_HEADER_PREFIX ) {
         // Does not look like a record.
-        throw new IOException("invalid record position %d (file: %s, offset: %d)".format(record_position, file.getName, offset))
+        throw new IOException("invalid record position %d (file: %s, offset: %d)".format(record_position, file.getAbsolutePath, offset))
       }
       val id = is.readByte()
       val expectedChecksum = is.readInt()

http://git-wip-us.apache.org/repos/asf/activemq/blob/a907fc9e/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
index f50e556..0381627 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
@@ -236,6 +236,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
       if( login == null || slave_state == null) {
         return;
       }
+      trace("%s: Got WAL ack, position: %d, from: %s", directory, req.position, slave_state.slave_id)
       slave_state.position_update(req.position)
     }
 
@@ -398,6 +399,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
       value.date = date
       wal_date = value.date;
       value.sync = (syncToMask & SYNC_TO_REMOTE_DISK)!=0
+      trace("%s: Sending WAL update: (file:%d, offset: %d, length: %d)", directory, value.file, value.offset, value.length)
       val frame1 = ReplicationFrame(WAL_ACTION, JsonCodec.encode(value))
       val frame2 = FileTransferFrame(file, offset, length)
       for( slave <- slaves.values() ) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/a907fc9e/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
index 7befe9d..07ef0ee 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
@@ -178,14 +178,15 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
             if( caughtUp && value.offset ==0 && value.file!=0 ) {
               client.log.rotate
             }
+            trace("%s, Slave WAL update: (file:%s, offset: %d, length: %d)".format(directory, value.file.toHexString, value.offset, value.length))
             val file = client.log.next_log(value.file)
             val buffer = map(file, value.offset, value.length, false)
             session.codec.readData(buffer, ^{
               if( value.sync ) {
                 buffer.force()
               }
+
               unmap(buffer)
-//              info("Slave WAL update: %s, (offset: %d, length: %d), sending ack:%s", file, value.offset, value.length, caughtUp)
               wal_append_offset = value.offset+value.length
               wal_append_position = value.file + wal_append_offset
               wal_date = value.date
@@ -296,7 +297,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
     transport.setDispatchQueue(queue)
     transport.connecting(new URI(connect), null)
 
-    debug("Connecting download session.")
+    debug("%s: Connecting download session. Snapshot index at: %s".format(directory, state.snapshot_position.toHexString))
     transfer_session = new Session(transport, (session)=> {
 
       var total_files = 0
@@ -360,6 +361,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
             val buffer = map(target_file, 0, x.length, false)
             session.codec.readData(buffer, ^{
               unmap(buffer)
+              trace("%s, Downloaded %s, offset:%d, length:%d", directory, transfer.file, transfer.offset, transfer.length)
               downloaded_size += x.length
               downloaded_files += 1
               update_download_status
@@ -384,6 +386,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
           val buffer = map(dirty_index / x.file, 0, x.length, false)
           session.codec.readData(buffer, ^{
             unmap(buffer)
+            trace("%s, Downloaded %s, offset:%d, length:%d", directory, transfer.file, transfer.offset, transfer.length)
             downloaded_size += x.length
             downloaded_files += 1
             update_download_status
@@ -405,6 +408,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
         }
         client.writeExecutor {
           if( !state.index_files.isEmpty ) {
+            trace("%s: Index sync complete, copying to snapshot.", directory)
             client.copyDirtyIndexToSnapshot(state.wal_append_position)
           }
           client.replay_init()

http://git-wip-us.apache.org/repos/asf/activemq/blob/a907fc9e/activemq-leveldb-store/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/resources/log4j.properties b/activemq-leveldb-store/src/test/resources/log4j.properties
index fd5a31b..da0480a 100755
--- a/activemq-leveldb-store/src/test/resources/log4j.properties
+++ b/activemq-leveldb-store/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@
 #
 log4j.rootLogger=WARN, console, file
 log4j.logger.org.apache.activemq=INFO
-log4j.logger.org.fusesource=INFO
+log4j.logger.org.apache.activemq.leveldb=INFO
 
 # Console will only display warnnings
 log4j.appender.console=org.apache.log4j.ConsoleAppender


[4/4] git commit: Continue to append to the last leveldb log file on a store restart.

Posted by ch...@apache.org.
Continue to append to the last leveldb log file on a store restart.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2824a94a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2824a94a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2824a94a

Branch: refs/heads/trunk
Commit: 2824a94af4040552feba3818b84ac2fdf1f4dffe
Parents: 85fc686
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Fri Nov 1 14:12:15 2013 -0400
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Mon Nov 4 09:26:01 2013 -0500

----------------------------------------------------------------------
 .../org/apache/activemq/leveldb/RecordLog.scala | 34 +++++++++++---------
 1 file changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2824a94a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
index d35becd..e69b58a 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
@@ -376,7 +376,7 @@ case class RecordLog(directory: File, logSuffix:String) {
       return Some(record_position+LOG_HEADER_SIZE+length, uow_start_pos)
     }
 
-    def verifyAndGetEndPosition:Long = {
+    def verifyAndGetEndOffset:Long = {
       var pos = position;
       var current_uow_start = pos
       val limit = position+channel.size()
@@ -387,15 +387,15 @@ case class RecordLog(directory: File, logSuffix:String) {
               if( uow_start_pos == current_uow_start ) {
                 current_uow_start = next
               } else {
-                return current_uow_start
+                return current_uow_start-position
               }
             }
             pos = next
           case None =>
-            return current_uow_start
+            return current_uow_start-position
         }
       }
-      return current_uow_start
+      return current_uow_start-position
     }
   }
 
@@ -417,33 +417,37 @@ case class RecordLog(directory: File, logSuffix:String) {
   val max_log_flush_latency = TimeMetric()
   val max_log_rotate_latency = TimeMetric()
 
-  def open(append_size:Long= -1) = {
+  def open(appender_size:Long= -1) = {
     log_mutex.synchronized {
       log_infos.clear()
       LevelDBClient.find_sequence_files(directory, logSuffix).foreach { case (position,file) =>
         log_infos.put(position, LogInfo(file, position, file.length()))
       }
 
-      val appendPos = if( log_infos.isEmpty ) {
+      if( log_infos.isEmpty ) {
         create_appender(0,0)
       } else {
         val file = log_infos.lastEntry().getValue
-        if( append_size == -1 ) {
+        if( appender_size == -1 ) {
           val r = LogReader(file.file, file.position)
           try {
-            val actualLength = r.verifyAndGetEndPosition
-            val updated = file.copy(length = actualLength - file.position)
-            log_infos.put(updated.position, updated)
-            if( updated.file.length != file.length ) {
-              // we need to truncate.
-              using(new RandomAccessFile(file.file, "rw")) ( _.setLength(updated.length))
+            val endOffset = r.verifyAndGetEndOffset
+            using(new RandomAccessFile(file.file, "rw")) { file=>
+              try {
+                file.getChannel.truncate(endOffset)
+              }
+              catch {
+                case e:Throwable =>
+                  e.printStackTrace()
+              }
+              file.getChannel.force(true)
             }
-            create_appender(actualLength,0)
+            create_appender(file.position,endOffset)
           } finally {
             r.release()
           }
         } else {
-          create_appender(file.position,append_size)
+          create_appender(file.position,appender_size)
         }
       }
     }


[3/4] git commit: Fixes for leveldb replication: make sure we only apply index updates when we encounter a UOW_END_RECORD so that we don't end up with an inconsistent index if a partial UOW is replicated.

Posted by ch...@apache.org.
Fixes for leveldb replication: make sure we only apply index updates when we encounter a UOW_END_RECORD so that we don't end up with an inconsistent index if a partial UOW is replicated.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b1d8cbe4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b1d8cbe4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b1d8cbe4

Branch: refs/heads/trunk
Commit: b1d8cbe4cd91c53ac44db57f9952dc559a1db742
Parents: dea38e6
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Fri Nov 1 14:21:30 2013 -0400
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Mon Nov 4 09:26:01 2013 -0500

----------------------------------------------------------------------
 .../apache/activemq/leveldb/LevelDBClient.scala | 42 +++++++++++++++++---
 1 file changed, 37 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b1d8cbe4/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index dbf6512..b130a22 100755
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -31,7 +31,7 @@ import record.{CollectionKey, EntryKey, EntryRecord, CollectionRecord}
 import org.apache.activemq.leveldb.util._
 import java.util.concurrent._
 import org.fusesource.hawtbuf._
-import java.io.{IOException, ObjectInputStream, ObjectOutputStream, File}
+import java.io._
 import scala.Option._
 import org.apache.activemq.command.{MessageAck, Message}
 import org.apache.activemq.util.{IOExceptionSupport, ByteSequence}
@@ -41,6 +41,17 @@ import org.apache.activemq.leveldb.util.TimeMetric
 import org.apache.activemq.leveldb.RecordLog.LogInfo
 import org.fusesource.leveldbjni.internal.JniDB
 import org.apache.activemq.ActiveMQMessageAuditNoSync
+import java.util.zip.CRC32
+import org.apache.activemq.leveldb.util.TimeMetric
+import org.fusesource.hawtbuf.ByteArrayInputStream
+import org.apache.activemq.leveldb.RecordLog.LogInfo
+import scala.Some
+import scala.Serializable
+import org.apache.activemq.leveldb.XaAckRecord
+import org.apache.activemq.leveldb.MessageRecord
+import org.apache.activemq.leveldb.EntryLocator
+import org.apache.activemq.leveldb.DataLocator
+import org.fusesource.hawtbuf.ByteArrayOutputStream
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -551,6 +562,7 @@ class LevelDBClient(store: LevelDBStore) {
       log.open()
     }
     replay_from(lastIndexSnapshotPos, log.appender_limit)
+    replay_write_batch = null;
   }
 
   def init() ={
@@ -678,7 +690,13 @@ class LevelDBClient(store: LevelDBStore) {
     }
   }
 
+  var replay_write_batch: WriteBatch = null
+
   def replay_from(from:Long, limit:Long, print_progress:Boolean=true) = {
+    debug("Replay of journal from: %d to %d.", from, limit)
+    if( replay_write_batch==null ) {
+      replay_write_batch = index.db.createWriteBatch()
+    }
     might_fail {
       try {
         // Update the index /w what was stored on the logs..
@@ -719,11 +737,13 @@ class LevelDBClient(store: LevelDBStore) {
                   case LOG_DATA =>
                     val message = decodeMessage(data)
                     store.db.producerSequenceIdTracker.isDuplicate(message.getMessageId)
+                    trace("Replay of LOG_DATA at %d, message id: ", pos, message.getMessageId)
 
                   case LOG_ADD_COLLECTION =>
                     val record= decodeCollectionRecord(data)
-                    index.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data)
+                    replay_write_batch.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data)
                     collectionMeta.put(record.getKey, new CollectionMeta)
+                    trace("Replay of LOG_ADD_COLLECTION at %d, collection: %s", pos, record.getKey)
 
                   case LOG_REMOVE_COLLECTION =>
                     val record = decodeCollectionKeyRecord(data)
@@ -741,6 +761,7 @@ class LevelDBClient(store: LevelDBStore) {
                     }
                     index.delete(data)
                     collectionMeta.remove(record.getKey)
+                    trace("Replay of LOG_REMOVE_COLLECTION at %d, collection: %s", pos, record.getKey)
 
                   case LOG_ADD_ENTRY | LOG_UPDATE_ENTRY =>
                     val record = decodeEntryRecord(data)
@@ -750,7 +771,7 @@ class LevelDBClient(store: LevelDBStore) {
                     index_record.setValueLength(record.getValueLength)
                     val    index_value = encodeEntryRecord(index_record.freeze()).toByteArray
 
-                    index.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value)
+                    replay_write_batch.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value)
 
                     if( kind==LOG_ADD_ENTRY ) {
                       if ( record.hasValueLocation ) {
@@ -758,6 +779,7 @@ class LevelDBClient(store: LevelDBStore) {
                       }
                       collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray)
                     }
+                    trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey)
 
                   case LOG_REMOVE_ENTRY =>
                     val record = decodeEntryRecord(data)
@@ -767,10 +789,18 @@ class LevelDBClient(store: LevelDBStore) {
                       logRefDecrement(record.getValueLocation)
                     }
 
-                    index.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey))
+                    replay_write_batch.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey))
                     collectionDecrementSize( record.getCollectionKey)
+                    trace("Replay of LOG_REMOVE_ENTRY collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey)
 
-                  case _ => // Skip other records, they don't modify the index.
+                  case LOG_TRACE =>
+                    trace("Replay of LOG_TRACE, message: %s", pos, data.ascii())
+                  case RecordLog.UOW_END_RECORD =>
+                    trace("Replay of UOW_END_RECORD")
+                    index.db.write(replay_write_batch)
+                    replay_write_batch=index.db.createWriteBatch()
+                  case kind => // Skip other records, they don't modify the index.
+                    trace("Skipping replay of %d record kind at %d", kind, pos)
 
                 }
                 pos = nextPos
@@ -788,9 +818,11 @@ class LevelDBClient(store: LevelDBStore) {
         case e:Throwable =>
           // replay failed.. good thing we are in a retry block...
           index.close
+          replay_write_batch = null
           throw e;
       } finally {
         recoveryLogs = null
+        debug("Replay of journal done")
       }
     }
   }


[2/4] git commit: leveldb replication: Lets always download the current append log just to be safe

Posted by ch...@apache.org.
leveldb replication: Lets always download the current append log just to be safe


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dea38e62
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dea38e62
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dea38e62

Branch: refs/heads/trunk
Commit: dea38e62f893a3ef3aa7cc21b87aef357f6fcae2
Parents: 2824a94
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Fri Nov 1 14:17:40 2013 -0400
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Mon Nov 4 09:26:01 2013 -0500

----------------------------------------------------------------------
 .../leveldb/replicated/SlaveLevelDBStore.scala     | 17 ++++-------------
 1 file changed, 4 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/dea38e62/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
index f1a47f7..7befe9d 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
@@ -332,20 +332,11 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
           if (stashed_file.length() == x.length )
             return stashed_file.cached_crc32 == x.crc32
 
-          if ( stashed_file.crc32(x.length) == x.crc32 ) {
-            // we don't want to truncate the log file currently being appended to.
-            if( x.file != state.append_log ) {
-              // Our log file might be longer. lets truncate to match.
-              val raf = new RandomAccessFile(stashed_file, "rw")
-              try {
-                raf.setLength(x.length)
-              } finally {
-                raf.close();
-              }
-            }
-            return true;
+          if( x.file == state.append_log ) {
+            return false;
           }
-          return false
+
+          return stashed_file.cached_crc32 == x.crc32
         }
 
         // We don't have to transfer log files that have been previously transferred.