You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/05/03 13:48:01 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6277 - tidy up logic that determines recovery location so that we don't recovery from the end of the journal in error on normal restart. This avoids suprious recovery logging

Repository: activemq
Updated Branches:
  refs/heads/master 3dd86d04e -> ba77b9f55


https://issues.apache.org/jira/browse/AMQ-6277 - tidy up logic that determines recovery location so that we don't recovery from the end of the journal in error on normal restart. This avoids suprious recovery logging


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ba77b9f5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ba77b9f5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ba77b9f5

Branch: refs/heads/master
Commit: ba77b9f55a627846ecab63916b2667f234022d34
Parents: 3dd86d0
Author: gtully <ga...@gmail.com>
Authored: Tue May 3 12:47:24 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue May 3 12:47:49 2016 +0100

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 23 +++++++-----
 .../activemq/store/kahadb/KahaDBTest.java       | 37 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ba77b9f5/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 3e754f7..4a23cbc 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -629,8 +629,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             Location ackMessageFileLocation = recoverAckMessageFileMap();
             Location lastIndoubtPosition = getRecoveryPosition();
 
-            Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation);
-            recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
+            Location recoveryPosition = startOfRecovery(producerAuditPosition, ackMessageFileLocation);
+            recoveryPosition = startOfRecovery(recoveryPosition, lastIndoubtPosition);
 
             if (recoveryPosition != null) {
                 int redoCounter = 0;
@@ -711,16 +711,21 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         return TransactionIdConversion.convertToLocal(tx);
     }
 
-    private Location minimum(Location producerAuditPosition,
-            Location lastIndoubtPosition) {
+    private Location startOfRecovery(Location x,
+            Location y) {
         Location min = null;
-        if (producerAuditPosition != null) {
-            min = producerAuditPosition;
-            if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
-                min = lastIndoubtPosition;
+        if (x != null) {
+            min = x;
+            if (y != null) {
+                int compare = y.compareTo(x);
+                if (compare < 0) {
+                    min = y;
+                } else if (compare == 0) {
+                    min = null; // no recovery needed on a matched location
+                }
             }
         } else {
-            min = lastIndoubtPosition;
+            min = y;
         }
         return min;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ba77b9f5/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
index 78f4c60..bd81524 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -31,6 +32,10 @@ import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
 
 /**
  * @author chirino
@@ -193,6 +198,38 @@ public class KahaDBTest extends TestCase {
         broker.stop();
     }
 
+    public void testNoReplayOnStopStart() throws Exception {
+        KahaDBStore kaha = createStore(true);
+        BrokerService broker = createBroker(kaha);
+        sendMessages(100);
+        broker.stop();
+        broker.waitUntilStopped();
+
+        kaha = createStore(false);
+        kaha.setCheckForCorruptJournalFiles(true);
+
+        final AtomicBoolean didSomeRecovery = new AtomicBoolean(false);
+        DefaultTestAppender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getLevel() == Level.INFO && event.getRenderedMessage().contains("Recovering from the journal @")) {
+                    didSomeRecovery.set(true);
+                }
+            }
+        };
+
+        Logger.getRootLogger().addAppender(appender);
+
+        broker = createBroker(kaha);
+
+        int count = receiveMessages();
+        assertEquals("Expected to received all messages.", count, 100);
+        broker.stop();
+
+        Logger.getRootLogger().addAppender(appender);
+        assertFalse("Did not replay any records from the journal", didSomeRecovery.get());
+    }
+
     private void assertExistsAndDelete(File file) {
         assertTrue(file.exists());
         file.delete();