You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/09/15 12:51:04 UTC
activemq git commit: [AMQ-6277] take account of producer audit not
being updatated on recovery check, avoid unnecessary partial journal replay
Repository: activemq
Updated Branches:
refs/heads/master ed395d1a8 -> a359d8152
[AMQ-6277] take account of producer audit not being updatated on recovery check, avoid unnecessary partial journal replay
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a359d815
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a359d815
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a359d815
Branch: refs/heads/master
Commit: a359d8152cfee6f2fe95d34fd1b2296f6ed2670c
Parents: ed395d1
Author: gtully <ga...@gmail.com>
Authored: Fri Sep 15 13:48:03 2017 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri Sep 15 13:48:03 2017 +0100
----------------------------------------------------------------------
.../activemq/store/kahadb/MessageDatabase.java | 35 ++++++--------
.../kahadb/KahaDBPersistenceAdapterTest.java | 49 ++++++++++++++++++--
2 files changed, 59 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/a359d815/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 40e8f95..a6d3cc8 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -671,17 +671,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try {
long start = System.currentTimeMillis();
- Location afterProducerAudit = recoverProducerAudit();
- Location afterAckMessageFile = recoverAckMessageFileMap();
+ boolean requiresJournalReplay = recoverProducerAudit();
+ requiresJournalReplay |= recoverAckMessageFileMap();
Location lastIndoubtPosition = getRecoveryPosition();
-
- if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) {
- // valid checkpoint, possible recover from afterAckMessageFile
- afterProducerAudit = null;
- }
- Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile);
- recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
-
+ Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition;
if (recoveryPosition != null) {
int redoCounter = 0;
int dataFileRotationTracker = recoveryPosition.getDataFileId();
@@ -784,7 +777,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return min;
}
- private Location recoverProducerAudit() throws IOException {
+ private boolean recoverProducerAudit() throws IOException {
+ boolean requiresReplay = true;
if (metadata.producerSequenceIdTrackerLocation != null) {
try {
KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
@@ -794,33 +788,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
- return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation);
+ requiresReplay = false;
} catch (Exception e) {
LOG.warn("Cannot recover message audit", e);
- return journal.getNextLocation(null);
}
- } else {
- // got no audit stored so got to recreate via replay from start of the journal
- return journal.getNextLocation(null);
}
+ // got no audit stored so got to recreate via replay from start of the journal
+ return requiresReplay;
}
@SuppressWarnings("unchecked")
- private Location recoverAckMessageFileMap() throws IOException {
+ private boolean recoverAckMessageFileMap() throws IOException {
+ boolean requiresReplay = true;
if (metadata.ackMessageFileMapLocation != null) {
try {
KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
- return getNextInitializedLocation(metadata.ackMessageFileMapLocation);
+ requiresReplay = false;
} catch (Exception e) {
LOG.warn("Cannot recover ackMessageFileMap", e);
- return journal.getNextLocation(null);
}
- } else {
- // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
- return journal.getNextLocation(null);
}
+ // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
+ return requiresReplay;
}
protected void recoverIndex(Transaction tx) throws IOException {
http://git-wip-us.apache.org/repos/asf/activemq/blob/a359d815/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
index c45c3e5..f509011 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
@@ -16,11 +16,18 @@
*/
package org.apache.activemq.store.kahadb;
-import java.io.File;
-import java.io.IOException;
-
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterTestSupport;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@@ -36,4 +43,40 @@ public class KahaDBPersistenceAdapterTest extends PersistenceAdapterTestSupport
}
return kaha;
}
+
+ public void testNoReplayOnStop() throws Exception {
+ brokerService.getPersistenceAdapter().checkpoint(true);
+ brokerService.stop();
+
+ final AtomicBoolean gotSomeReplay = new AtomicBoolean(Boolean.FALSE);
+ final AtomicBoolean trappedLogMessages = new AtomicBoolean(Boolean.FALSE);
+
+ Appender appender = new DefaultTestAppender() {
+ @Override
+ public void doAppend(LoggingEvent event) {
+ trappedLogMessages.set(true);
+ if (event.getLevel().equals(Level.INFO)) {
+ if (event.getMessage().toString().contains("Recovery replayed ")) {
+ gotSomeReplay.set(true);
+ }
+ }
+ }
+ };
+
+ try {
+ Logger.getLogger(MessageDatabase.class.getName()).addAppender(appender);
+ Logger.getLogger(MessageDatabase.class.getName()).setLevel(Level.INFO);
+
+ brokerService = new BrokerService();
+ pa = createPersistenceAdapter(false);
+ brokerService.setPersistenceAdapter(pa);
+ brokerService.start();
+
+ } finally {
+ Logger.getRootLogger().removeAppender(appender);
+ Logger.getLogger(MessageDatabase.class.getName()).removeAppender(appender);
+ }
+ assertTrue("log capture working", trappedLogMessages.get());
+ assertFalse("no replay message in the log", gotSomeReplay.get());
+ }
}