You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/09/15 12:51:04 UTC

activemq git commit: [AMQ-6277] take account of producer audit not being updatated on recovery check, avoid unnecessary partial journal replay

Repository: activemq
Updated Branches:
  refs/heads/master ed395d1a8 -> a359d8152


[AMQ-6277] take account of producer audit not being updatated on recovery check, avoid unnecessary partial journal replay


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a359d815
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a359d815
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a359d815

Branch: refs/heads/master
Commit: a359d8152cfee6f2fe95d34fd1b2296f6ed2670c
Parents: ed395d1
Author: gtully <ga...@gmail.com>
Authored: Fri Sep 15 13:48:03 2017 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri Sep 15 13:48:03 2017 +0100

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 35 ++++++--------
 .../kahadb/KahaDBPersistenceAdapterTest.java    | 49 ++++++++++++++++++--
 2 files changed, 59 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a359d815/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 40e8f95..a6d3cc8 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -671,17 +671,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         try {
 
             long start = System.currentTimeMillis();
-            Location afterProducerAudit = recoverProducerAudit();
-            Location afterAckMessageFile = recoverAckMessageFileMap();
+            boolean requiresJournalReplay = recoverProducerAudit();
+            requiresJournalReplay |= recoverAckMessageFileMap();
             Location lastIndoubtPosition = getRecoveryPosition();
-
-            if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) {
-                // valid checkpoint, possible recover from afterAckMessageFile
-                afterProducerAudit = null;
-            }
-            Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile);
-            recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
-
+            Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition;
             if (recoveryPosition != null) {
                 int redoCounter = 0;
                 int dataFileRotationTracker = recoveryPosition.getDataFileId();
@@ -784,7 +777,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         return min;
     }
 
-    private Location recoverProducerAudit() throws IOException {
+    private boolean recoverProducerAudit() throws IOException {
+        boolean requiresReplay = true;
         if (metadata.producerSequenceIdTrackerLocation != null) {
             try {
                 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
@@ -794,33 +788,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
                 metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
                 metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
-                return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation);
+                requiresReplay = false;
             } catch (Exception e) {
                 LOG.warn("Cannot recover message audit", e);
-                return journal.getNextLocation(null);
             }
-        } else {
-            // got no audit stored so got to recreate via replay from start of the journal
-            return journal.getNextLocation(null);
         }
+        // got no audit stored so got to recreate via replay from start of the journal
+        return requiresReplay;
     }
 
     @SuppressWarnings("unchecked")
-    private Location recoverAckMessageFileMap() throws IOException {
+    private boolean recoverAckMessageFileMap() throws IOException {
+        boolean requiresReplay = true;
         if (metadata.ackMessageFileMapLocation != null) {
             try {
                 KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
                 ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
                 metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
-                return getNextInitializedLocation(metadata.ackMessageFileMapLocation);
+                requiresReplay = false;
             } catch (Exception e) {
                 LOG.warn("Cannot recover ackMessageFileMap", e);
-                return journal.getNextLocation(null);
             }
-        } else {
-            // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
-            return journal.getNextLocation(null);
         }
+        // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
+        return requiresReplay;
     }
 
     protected void recoverIndex(Transaction tx) throws IOException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/a359d815/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
index c45c3e5..f509011 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
@@ -16,11 +16,18 @@
  */
 package org.apache.activemq.store.kahadb;
 
-import java.io.File;
-import java.io.IOException;
-
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.PersistenceAdapterTestSupport;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * 
@@ -36,4 +43,40 @@ public class KahaDBPersistenceAdapterTest extends PersistenceAdapterTestSupport
         }
         return kaha;
     }
+
+    public void testNoReplayOnStop() throws Exception {
+        brokerService.getPersistenceAdapter().checkpoint(true);
+        brokerService.stop();
+
+        final AtomicBoolean gotSomeReplay = new AtomicBoolean(Boolean.FALSE);
+        final AtomicBoolean trappedLogMessages = new AtomicBoolean(Boolean.FALSE);
+
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                trappedLogMessages.set(true);
+                if (event.getLevel().equals(Level.INFO)) {
+                    if (event.getMessage().toString().contains("Recovery replayed ")) {
+                        gotSomeReplay.set(true);
+                    }
+                }
+            }
+        };
+
+        try {
+            Logger.getLogger(MessageDatabase.class.getName()).addAppender(appender);
+            Logger.getLogger(MessageDatabase.class.getName()).setLevel(Level.INFO);
+
+            brokerService = new BrokerService();
+            pa = createPersistenceAdapter(false);
+            brokerService.setPersistenceAdapter(pa);
+            brokerService.start();
+
+        } finally {
+            Logger.getRootLogger().removeAppender(appender);
+            Logger.getLogger(MessageDatabase.class.getName()).removeAppender(appender);
+        }
+        assertTrue("log capture working", trappedLogMessages.get());
+        assertFalse("no replay message in the log", gotSomeReplay.get());
+    }
 }