You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/11/08 16:34:35 UTC

git commit: Replicated leveldb slaves index snapshots were being labeled with higher journal positions than what they really contained.

Updated Branches:
  refs/heads/trunk 0d3884005 -> 42e1c463d


Replicated leveldb slaves index snapshots were being labeled with higher journal positions than what they really contained.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/42e1c463
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/42e1c463
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/42e1c463

Branch: refs/heads/trunk
Commit: 42e1c463d4df7b210c9694aaf9dbe36c3d29a01b
Parents: 0d38840
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Fri Nov 8 10:25:50 2013 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Fri Nov 8 10:34:21 2013 -0500

----------------------------------------------------------------------
 .../apache/activemq/leveldb/LevelDBClient.scala | 49 +++++++++--------
 .../leveldb/replicated/SlaveLevelDBStore.scala  |  7 ++-
 .../test/ReplicatedLevelDBStoreTest.java        | 57 ++++++++++++--------
 3 files changed, 68 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/42e1c463/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index 1a0dd35..15f7bb0 100755
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -671,6 +671,7 @@ class LevelDBClient(store: LevelDBStore) {
       for( (id, file)<- lastSnapshotIndex ) {
         try {
           copyIndex(file, dirtyIndexFile)
+          debug("Recovering from last index snapshot at: "+dirtyIndexFile)
         } catch {
           case e:Exception =>
             warn(e, "Could not recover snapshot of the index: "+e)
@@ -678,11 +679,9 @@ class LevelDBClient(store: LevelDBStore) {
         }
       }
       index = new RichDB(factory.open(dirtyIndexFile, indexOptions));
-      if ( store.paranoidChecks ) {
-        for(value <- index.get(DIRTY_INDEX_KEY) ) {
-          if( java.util.Arrays.equals(value, TRUE) ) {
-            warn("Recovering from a dirty index.")
-          }
+      for(value <- index.get(DIRTY_INDEX_KEY) ) {
+        if( java.util.Arrays.equals(value, TRUE) ) {
+          warn("Recovering from a dirty index.")
         }
       }
       index.put(DIRTY_INDEX_KEY, TRUE)
@@ -691,6 +690,7 @@ class LevelDBClient(store: LevelDBStore) {
   }
 
   var replay_write_batch: WriteBatch = null
+  var indexRecoveryPosition = 0L
 
   def replay_from(from:Long, limit:Long, print_progress:Boolean=true) = {
     debug("Replay of journal from: %d to %d.", from, limit)
@@ -700,19 +700,19 @@ class LevelDBClient(store: LevelDBStore) {
     might_fail {
       try {
         // Update the index /w what was stored on the logs..
-        var pos = from;
+        indexRecoveryPosition = from;
         var last_reported_at = System.currentTimeMillis();
         var showing_progress = false
         var last_reported_pos = 0L
         try {
-          while (pos < limit) {
+          while (indexRecoveryPosition < limit) {
 
             if( print_progress ) {
               val now = System.currentTimeMillis();
               if( now > last_reported_at+1000 ) {
-                val at = pos-from
+                val at = indexRecoveryPosition-from
                 val total = limit-from
-                val rate = (pos-last_reported_pos)*1000.0 / (now - last_reported_at)
+                val rate = (indexRecoveryPosition-last_reported_pos)*1000.0 / (now - last_reported_at)
                 val eta = (total-at)/rate
                 val remaining = if(eta > 60*60) {
                   "%.2f hrs".format(eta/(60*60))
@@ -726,24 +726,24 @@ class LevelDBClient(store: LevelDBStore) {
                   at*100.0/total, at, total, rate/1024, remaining))
                 showing_progress = true;
                 last_reported_at = now
-                last_reported_pos = pos
+                last_reported_pos = indexRecoveryPosition
               }
             }
 
 
-            log.read(pos).map {
+            log.read(indexRecoveryPosition).map {
               case (kind, data, nextPos) =>
                 kind match {
                   case LOG_DATA =>
                     val message = decodeMessage(data)
                     store.db.producerSequenceIdTracker.isDuplicate(message.getMessageId)
-                    trace("Replay of LOG_DATA at %d, message id: ", pos, message.getMessageId)
+                    trace("Replay of LOG_DATA at %d, message id: ", indexRecoveryPosition, message.getMessageId)
 
                   case LOG_ADD_COLLECTION =>
                     val record= decodeCollectionRecord(data)
                     replay_write_batch.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data)
                     collectionMeta.put(record.getKey, new CollectionMeta)
-                    trace("Replay of LOG_ADD_COLLECTION at %d, collection: %s", pos, record.getKey)
+                    trace("Replay of LOG_ADD_COLLECTION at %d, collection: %s", indexRecoveryPosition, record.getKey)
 
                   case LOG_REMOVE_COLLECTION =>
                     val record = decodeCollectionKeyRecord(data)
@@ -761,7 +761,7 @@ class LevelDBClient(store: LevelDBStore) {
                     }
                     index.delete(data)
                     collectionMeta.remove(record.getKey)
-                    trace("Replay of LOG_REMOVE_COLLECTION at %d, collection: %s", pos, record.getKey)
+                    trace("Replay of LOG_REMOVE_COLLECTION at %d, collection: %s", indexRecoveryPosition, record.getKey)
 
                   case LOG_ADD_ENTRY | LOG_UPDATE_ENTRY =>
                     val record = decodeEntryRecord(data)
@@ -779,7 +779,8 @@ class LevelDBClient(store: LevelDBStore) {
                       }
                       collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray)
                     }
-                    trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey)
+                    trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", indexRecoveryPosition, record.getCollectionKey, record.getEntryKey)
+
 
                   case LOG_REMOVE_ENTRY =>
                     val record = decodeEntryRecord(data)
@@ -791,19 +792,19 @@ class LevelDBClient(store: LevelDBStore) {
 
                     replay_write_batch.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey))
                     collectionDecrementSize( record.getCollectionKey)
-                    trace("Replay of LOG_REMOVE_ENTRY collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey)
+                    trace("Replay of LOG_REMOVE_ENTRY collection: %s, entry: %s", indexRecoveryPosition, record.getCollectionKey, record.getEntryKey)
 
                   case LOG_TRACE =>
-                    trace("Replay of LOG_TRACE, message: %s", pos, data.ascii())
+                    trace("Replay of LOG_TRACE, message: %s", indexRecoveryPosition, data.ascii())
                   case RecordLog.UOW_END_RECORD =>
                     trace("Replay of UOW_END_RECORD")
                     index.db.write(replay_write_batch)
                     replay_write_batch=index.db.createWriteBatch()
                   case kind => // Skip other records, they don't modify the index.
-                    trace("Skipping replay of %d record kind at %d", kind, pos)
+                    trace("Skipping replay of %d record kind at %d", kind, indexRecoveryPosition)
 
                 }
-                pos = nextPos
+                indexRecoveryPosition = nextPos
             }
           }
         }
@@ -989,10 +990,11 @@ class LevelDBClient(store: LevelDBStore) {
           index.put(DIRTY_INDEX_KEY, FALSE, new WriteOptions().sync(true))
           index.close
           index = null
+          debug("Gracefuly closed the index")
+          copyDirtyIndexToSnapshot
         }
         if (log!=null && log.isOpen) {
           log.close
-          copyDirtyIndexToSnapshot
           stored_wal_append_position = log.appender_limit
           log = null
         }
@@ -1043,15 +1045,18 @@ class LevelDBClient(store: LevelDBStore) {
     snapshotRwLock.writeLock().unlock()
   }
 
+  def nextIndexSnapshotPos:Long = wal_append_position
+
   def copyDirtyIndexToSnapshot:Unit = {
-    if( log.appender_limit == lastIndexSnapshotPos  ) {
+    if( nextIndexSnapshotPos == lastIndexSnapshotPos  ) {
       // no need to snapshot again...
       return
     }
-    copyDirtyIndexToSnapshot(log.appender_limit)
+    copyDirtyIndexToSnapshot(nextIndexSnapshotPos)
   }
 
   def copyDirtyIndexToSnapshot(walPosition:Long):Unit = {
+    debug("Taking a snapshot of the current index: "+snapshotIndexFile(walPosition))
     // Where we start copying files into.  Delete this on
     // restart.
     val tmpDir = tempIndexFile

http://git-wip-us.apache.org/repos/asf/activemq/blob/42e1c463/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
index 2fd7c1e..2d288bd 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
@@ -58,9 +58,14 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
     // he slave is caught up.
     override def post_log_rotate: Unit = {
       if( caughtUp ) {
-        super.post_log_rotate
+        writeExecutor {
+          snapshotIndex(false)
+        }
       }
     }
+
+    // The snapshots we create are based on what has been replayed.
+    override def nextIndexSnapshotPos:Long = indexRecoveryPosition
   }
 
   override def doStart() = {

http://git-wip-us.apache.org/repos/asf/activemq/blob/42e1c463/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
index 119b08f..d9ba101 100644
--- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
+++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
@@ -118,21 +118,22 @@ public class ReplicatedLevelDBStoreTest {
         return f;
     }
 
-    @Test(timeout = 1000*60*60)
+    @Test(timeout = 1000*60*20)
     public void testReplication() throws Exception {
 
         LinkedList<File> directories = new LinkedList<File>();
         directories.add(new File("target/activemq-data/leveldb-node1"));
         directories.add(new File("target/activemq-data/leveldb-node2"));
         directories.add(new File("target/activemq-data/leveldb-node3"));
+        resetDirectories(directories);
 
-        for (File f : directories) {
-            FileSupport.toRichFile(f).recursiveDelete();
-        }
+        // For some reason this had to be 64k to trigger a bug where
+        // slave index snapshots were being done incorrectly.
+        String playload = createPlayload(64*1024);
 
         ArrayList<String> expected_list = new ArrayList<String>();
         // We will rotate between 3 nodes the task of being the master.
-        for (int j = 0; j < 10; j++) {
+        for (int j = 0; j < 5; j++) {
 
             MasterLevelDBStore master = createMaster(directories.get(0));
             CountDownFuture masterStart = asyncStart(master);
@@ -141,8 +142,12 @@ public class ReplicatedLevelDBStoreTest {
             asyncStart(slave2);
             masterStart.await();
 
-            LOG.info("Adding messages...");
             MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
+
+            LOG.info("Checking: "+master.getDirectory());
+            assertEquals(expected_list, getMessages(ms));
+
+            LOG.info("Adding messages...");
             final int TOTAL = 500;
             for (int i = 0; i < TOTAL; i++) {
                 if (i % ((int) (TOTAL * 0.10)) == 0) {
@@ -152,19 +157,23 @@ public class ReplicatedLevelDBStoreTest {
                 if (i == 250) {
                     slave1.start();
                     slave2.stop();
+                    LOG.info("Checking: "+master.getDirectory());
+                    assertEquals(expected_list, getMessages(ms));
                 }
 
                 String msgid = "m:" + j + ":" + i;
-                addMessage(ms, msgid);
+                addMessage(ms, msgid, playload);
                 expected_list.add(msgid);
             }
 
-            LOG.info("Checking master state");
+            LOG.info("Checking: "+master.getDirectory());
             assertEquals(expected_list, getMessages(ms));
 
-            LOG.info("Stopping master: " + master.node_id());
+            LOG.info("Stopping master: " + master.getDirectory());
             master.stop();
-            LOG.info("Stopping slave: " + slave1.node_id());
+
+            Thread.sleep(3*1000);
+            LOG.info("Stopping slave: " + slave1.getDirectory());
             slave1.stop();
 
             // Rotate the dir order so that slave1 becomes the master next.
@@ -172,22 +181,26 @@ public class ReplicatedLevelDBStoreTest {
         }
     }
 
+    void resetDirectories(LinkedList<File> directories) {
+        for (File directory : directories) {
+            FileSupport.toRichFile(directory).recursiveDelete();
+            directory.mkdirs();
+            FileSupport.toRichFile(new File(directory, "nodeid.txt")).writeText(directory.getName(), "UTF-8");
+        }
+    }
+
     @Test(timeout = 1000*60*60)
     public void testSlowSlave() throws Exception {
 
-        File node1Dir = new File("target/activemq-data/leveldb-node1");
-        File node2Dir = new File("target/activemq-data/leveldb-node2");
-        File node3Dir = new File("target/activemq-data/leveldb-node3");
-
-        FileSupport.toRichFile(node1Dir).recursiveDelete();
-        FileSupport.toRichFile(node2Dir).recursiveDelete();
-        FileSupport.toRichFile(node3Dir).recursiveDelete();
-
-        node2Dir.mkdirs();
-        node3Dir.mkdirs();
-        FileSupport.toRichFile(new File(node2Dir, "nodeid.txt")).writeText("node2", "UTF-8");
-        FileSupport.toRichFile(new File(node3Dir, "nodeid.txt")).writeText("node3", "UTF-8");
+        LinkedList<File> directories = new LinkedList<File>();
+        directories.add(new File("target/activemq-data/leveldb-node1"));
+        directories.add(new File("target/activemq-data/leveldb-node2"));
+        directories.add(new File("target/activemq-data/leveldb-node3"));
+        resetDirectories(directories);
 
+        File node1Dir = directories.get(0);
+        File node2Dir = directories.get(1);
+        File node3Dir = directories.get(2);
 
         ArrayList<String> expected_list = new ArrayList<String>();