You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/09/15 13:14:30 UTC

[1/2] activemq git commit: [AMQ-6277] take account of producer audit not being updatated on recovery check, avoid unnecessary partial journal replay

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.15.x 938745141 -> e1699d58c


[AMQ-6277] take account of producer audit not being updatated on recovery check, avoid unnecessary partial journal replay

(cherry picked from commit a359d8152cfee6f2fe95d34fd1b2296f6ed2670c)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d894d570
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d894d570
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d894d570

Branch: refs/heads/activemq-5.15.x
Commit: d894d570d4254cc9bb6bca0ab6add2bafe2acc11
Parents: 9387451
Author: gtully <ga...@gmail.com>
Authored: Fri Sep 15 13:48:03 2017 +0100
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Sep 15 09:03:47 2017 -0400

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 35 ++++++--------
 .../kahadb/KahaDBPersistenceAdapterTest.java    | 49 ++++++++++++++++++--
 2 files changed, 59 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d894d570/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 40e8f95..a6d3cc8 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -671,17 +671,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         try {
 
             long start = System.currentTimeMillis();
-            Location afterProducerAudit = recoverProducerAudit();
-            Location afterAckMessageFile = recoverAckMessageFileMap();
+            boolean requiresJournalReplay = recoverProducerAudit();
+            requiresJournalReplay |= recoverAckMessageFileMap();
             Location lastIndoubtPosition = getRecoveryPosition();
-
-            if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) {
-                // valid checkpoint, possible recover from afterAckMessageFile
-                afterProducerAudit = null;
-            }
-            Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile);
-            recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
-
+            Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition;
             if (recoveryPosition != null) {
                 int redoCounter = 0;
                 int dataFileRotationTracker = recoveryPosition.getDataFileId();
@@ -784,7 +777,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         return min;
     }
 
-    private Location recoverProducerAudit() throws IOException {
+    private boolean recoverProducerAudit() throws IOException {
+        boolean requiresReplay = true;
         if (metadata.producerSequenceIdTrackerLocation != null) {
             try {
                 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
@@ -794,33 +788,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
                 metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
                 metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
-                return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation);
+                requiresReplay = false;
             } catch (Exception e) {
                 LOG.warn("Cannot recover message audit", e);
-                return journal.getNextLocation(null);
             }
-        } else {
-            // got no audit stored so got to recreate via replay from start of the journal
-            return journal.getNextLocation(null);
         }
+        // got no audit stored so got to recreate via replay from start of the journal
+        return requiresReplay;
     }
 
     @SuppressWarnings("unchecked")
-    private Location recoverAckMessageFileMap() throws IOException {
+    private boolean recoverAckMessageFileMap() throws IOException {
+        boolean requiresReplay = true;
         if (metadata.ackMessageFileMapLocation != null) {
             try {
                 KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
                 ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
                 metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
-                return getNextInitializedLocation(metadata.ackMessageFileMapLocation);
+                requiresReplay = false;
             } catch (Exception e) {
                 LOG.warn("Cannot recover ackMessageFileMap", e);
-                return journal.getNextLocation(null);
             }
-        } else {
-            // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
-            return journal.getNextLocation(null);
         }
+        // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
+        return requiresReplay;
     }
 
     protected void recoverIndex(Transaction tx) throws IOException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/d894d570/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
index c45c3e5..f509011 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
@@ -16,11 +16,18 @@
  */
 package org.apache.activemq.store.kahadb;
 
-import java.io.File;
-import java.io.IOException;
-
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.PersistenceAdapterTestSupport;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * 
@@ -36,4 +43,40 @@ public class KahaDBPersistenceAdapterTest extends PersistenceAdapterTestSupport
         }
         return kaha;
     }
+
+    public void testNoReplayOnStop() throws Exception {
+        brokerService.getPersistenceAdapter().checkpoint(true);
+        brokerService.stop();
+
+        final AtomicBoolean gotSomeReplay = new AtomicBoolean(Boolean.FALSE);
+        final AtomicBoolean trappedLogMessages = new AtomicBoolean(Boolean.FALSE);
+
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                trappedLogMessages.set(true);
+                if (event.getLevel().equals(Level.INFO)) {
+                    if (event.getMessage().toString().contains("Recovery replayed ")) {
+                        gotSomeReplay.set(true);
+                    }
+                }
+            }
+        };
+
+        try {
+            Logger.getLogger(MessageDatabase.class.getName()).addAppender(appender);
+            Logger.getLogger(MessageDatabase.class.getName()).setLevel(Level.INFO);
+
+            brokerService = new BrokerService();
+            pa = createPersistenceAdapter(false);
+            brokerService.setPersistenceAdapter(pa);
+            brokerService.start();
+
+        } finally {
+            Logger.getRootLogger().removeAppender(appender);
+            Logger.getLogger(MessageDatabase.class.getName()).removeAppender(appender);
+        }
+        assertTrue("log capture working", trappedLogMessages.get());
+        assertFalse("no replay message in the log", gotSomeReplay.get());
+    }
 }


[2/2] activemq git commit: AMQ-6808 preserve destination for browsed scheduled messages

Posted by ta...@apache.org.
AMQ-6808 preserve destination for browsed scheduled messages

(cherry picked from commit c691124d3288ffc9299bedca69b952952f2cbcf6)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e1699d58
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e1699d58
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e1699d58

Branch: refs/heads/activemq-5.15.x
Commit: e1699d58c899700a221662d6f4c728cc306ee97b
Parents: d894d57
Author: Erik Wramner <er...@codemint.com>
Authored: Tue Sep 12 21:46:02 2017 +0200
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Sep 15 09:03:58 2017 -0400

----------------------------------------------------------------------
 .../org/apache/activemq/broker/scheduler/SchedulerBroker.java    | 4 ++++
 .../activemq/broker/scheduler/JobSchedulerManagementTest.java    | 4 ++++
 2 files changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e1699d58/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
index a00b456..5542973 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
@@ -426,6 +426,10 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
             msg.setPersistent(false);
             msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
             msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
+
+            // Preserve original destination
+            msg.setOriginalDestination(msg.getDestination());
+
             msg.setDestination(replyTo);
             msg.setResponseRequired(false);
             msg.setProducerId(this.producerId);

http://git-wip-us.apache.org/repos/asf/activemq/blob/e1699d58/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
index c944be3..6f6dc76 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
@@ -34,6 +34,7 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.util.IdGenerator;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -396,6 +397,9 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
         assertNotNull(message);
         assertEquals(45000, message.getLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY));
 
+        // Verify that original destination was preserved
+        assertEquals(destination, ((ActiveMQMessage) message).getOriginalDestination());
+
         // Now check if there are anymore, there shouldn't be
         message = browser.receive(5000);
         assertNull(message);