You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/11/04 15:26:15 UTC

[3/4] git commit: Fixes for leveldb replication: make sure we only apply index updates when we encounter a UOW_END_RECORD so that we don't end up with an inconsistent index if a partial UOW is replicated.

Fixes for leveldb replication: make sure we only apply index updates when we encounter a UOW_END_RECORD so that we don't end up with an inconsistent index if a partial UOW is replicated.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b1d8cbe4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b1d8cbe4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b1d8cbe4

Branch: refs/heads/trunk
Commit: b1d8cbe4cd91c53ac44db57f9952dc559a1db742
Parents: dea38e6
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Fri Nov 1 14:21:30 2013 -0400
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Mon Nov 4 09:26:01 2013 -0500

----------------------------------------------------------------------
 .../apache/activemq/leveldb/LevelDBClient.scala | 42 +++++++++++++++++---
 1 file changed, 37 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b1d8cbe4/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index dbf6512..b130a22 100755
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -31,7 +31,7 @@ import record.{CollectionKey, EntryKey, EntryRecord, CollectionRecord}
 import org.apache.activemq.leveldb.util._
 import java.util.concurrent._
 import org.fusesource.hawtbuf._
-import java.io.{IOException, ObjectInputStream, ObjectOutputStream, File}
+import java.io._
 import scala.Option._
 import org.apache.activemq.command.{MessageAck, Message}
 import org.apache.activemq.util.{IOExceptionSupport, ByteSequence}
@@ -41,6 +41,17 @@ import org.apache.activemq.leveldb.util.TimeMetric
 import org.apache.activemq.leveldb.RecordLog.LogInfo
 import org.fusesource.leveldbjni.internal.JniDB
 import org.apache.activemq.ActiveMQMessageAuditNoSync
+import java.util.zip.CRC32
+import org.apache.activemq.leveldb.util.TimeMetric
+import org.fusesource.hawtbuf.ByteArrayInputStream
+import org.apache.activemq.leveldb.RecordLog.LogInfo
+import scala.Some
+import scala.Serializable
+import org.apache.activemq.leveldb.XaAckRecord
+import org.apache.activemq.leveldb.MessageRecord
+import org.apache.activemq.leveldb.EntryLocator
+import org.apache.activemq.leveldb.DataLocator
+import org.fusesource.hawtbuf.ByteArrayOutputStream
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -551,6 +562,7 @@ class LevelDBClient(store: LevelDBStore) {
       log.open()
     }
     replay_from(lastIndexSnapshotPos, log.appender_limit)
+    replay_write_batch = null;
   }
 
   def init() ={
@@ -678,7 +690,13 @@ class LevelDBClient(store: LevelDBStore) {
     }
   }
 
+  var replay_write_batch: WriteBatch = null
+
   def replay_from(from:Long, limit:Long, print_progress:Boolean=true) = {
+    debug("Replay of journal from: %d to %d.", from, limit)
+    if( replay_write_batch==null ) {
+      replay_write_batch = index.db.createWriteBatch()
+    }
     might_fail {
       try {
         // Update the index /w what was stored on the logs..
@@ -719,11 +737,13 @@ class LevelDBClient(store: LevelDBStore) {
                   case LOG_DATA =>
                     val message = decodeMessage(data)
                     store.db.producerSequenceIdTracker.isDuplicate(message.getMessageId)
+                    trace("Replay of LOG_DATA at %d, message id: ", pos, message.getMessageId)
 
                   case LOG_ADD_COLLECTION =>
                     val record= decodeCollectionRecord(data)
-                    index.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data)
+                    replay_write_batch.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data)
                     collectionMeta.put(record.getKey, new CollectionMeta)
+                    trace("Replay of LOG_ADD_COLLECTION at %d, collection: %s", pos, record.getKey)
 
                   case LOG_REMOVE_COLLECTION =>
                     val record = decodeCollectionKeyRecord(data)
@@ -741,6 +761,7 @@ class LevelDBClient(store: LevelDBStore) {
                     }
                     index.delete(data)
                     collectionMeta.remove(record.getKey)
+                    trace("Replay of LOG_REMOVE_COLLECTION at %d, collection: %s", pos, record.getKey)
 
                   case LOG_ADD_ENTRY | LOG_UPDATE_ENTRY =>
                     val record = decodeEntryRecord(data)
@@ -750,7 +771,7 @@ class LevelDBClient(store: LevelDBStore) {
                     index_record.setValueLength(record.getValueLength)
                     val    index_value = encodeEntryRecord(index_record.freeze()).toByteArray
 
-                    index.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value)
+                    replay_write_batch.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value)
 
                     if( kind==LOG_ADD_ENTRY ) {
                       if ( record.hasValueLocation ) {
@@ -758,6 +779,7 @@ class LevelDBClient(store: LevelDBStore) {
                       }
                       collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray)
                     }
+                    trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey)
 
                   case LOG_REMOVE_ENTRY =>
                     val record = decodeEntryRecord(data)
@@ -767,10 +789,18 @@ class LevelDBClient(store: LevelDBStore) {
                       logRefDecrement(record.getValueLocation)
                     }
 
-                    index.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey))
+                    replay_write_batch.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey))
                     collectionDecrementSize( record.getCollectionKey)
+                    trace("Replay of LOG_REMOVE_ENTRY collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey)
 
-                  case _ => // Skip other records, they don't modify the index.
+                  case LOG_TRACE =>
+                    trace("Replay of LOG_TRACE, message: %s", pos, data.ascii())
+                  case RecordLog.UOW_END_RECORD =>
+                    trace("Replay of UOW_END_RECORD")
+                    index.db.write(replay_write_batch)
+                    replay_write_batch=index.db.createWriteBatch()
+                  case kind => // Skip other records, they don't modify the index.
+                    trace("Skipping replay of %d record kind at %d", kind, pos)
 
                 }
                 pos = nextPos
@@ -788,9 +818,11 @@ class LevelDBClient(store: LevelDBStore) {
         case e:Throwable =>
           // replay failed.. good thing we are in a retry block...
           index.close
+          replay_write_batch = null
           throw e;
       } finally {
         recoveryLogs = null
+        debug("Replay of journal done")
       }
     }
   }