You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/05/03 13:48:01 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6277 -
tidy up logic that determines recovery location so that we don't recovery
from the end of the journal in error on normal restart. This avoids suprious
recovery logging
Repository: activemq
Updated Branches:
refs/heads/master 3dd86d04e -> ba77b9f55
https://issues.apache.org/jira/browse/AMQ-6277 - tidy up logic that determines recovery location so that we don't recovery from the end of the journal in error on normal restart. This avoids suprious recovery logging
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ba77b9f5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ba77b9f5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ba77b9f5
Branch: refs/heads/master
Commit: ba77b9f55a627846ecab63916b2667f234022d34
Parents: 3dd86d0
Author: gtully <ga...@gmail.com>
Authored: Tue May 3 12:47:24 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue May 3 12:47:49 2016 +0100
----------------------------------------------------------------------
.../activemq/store/kahadb/MessageDatabase.java | 23 +++++++-----
.../activemq/store/kahadb/KahaDBTest.java | 37 ++++++++++++++++++++
2 files changed, 51 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/ba77b9f5/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 3e754f7..4a23cbc 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -629,8 +629,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
Location ackMessageFileLocation = recoverAckMessageFileMap();
Location lastIndoubtPosition = getRecoveryPosition();
- Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation);
- recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
+ Location recoveryPosition = startOfRecovery(producerAuditPosition, ackMessageFileLocation);
+ recoveryPosition = startOfRecovery(recoveryPosition, lastIndoubtPosition);
if (recoveryPosition != null) {
int redoCounter = 0;
@@ -711,16 +711,21 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return TransactionIdConversion.convertToLocal(tx);
}
- private Location minimum(Location producerAuditPosition,
- Location lastIndoubtPosition) {
+ private Location startOfRecovery(Location x,
+ Location y) {
Location min = null;
- if (producerAuditPosition != null) {
- min = producerAuditPosition;
- if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
- min = lastIndoubtPosition;
+ if (x != null) {
+ min = x;
+ if (y != null) {
+ int compare = y.compareTo(x);
+ if (compare < 0) {
+ min = y;
+ } else if (compare == 0) {
+ min = null; // no recovery needed on a matched location
+ }
}
} else {
- min = lastIndoubtPosition;
+ min = y;
}
return min;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ba77b9f5/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
index 78f4c60..bd81524 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -31,6 +32,10 @@ import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
/**
* @author chirino
@@ -193,6 +198,38 @@ public class KahaDBTest extends TestCase {
broker.stop();
}
+ public void testNoReplayOnStopStart() throws Exception {
+ KahaDBStore kaha = createStore(true);
+ BrokerService broker = createBroker(kaha);
+ sendMessages(100);
+ broker.stop();
+ broker.waitUntilStopped();
+
+ kaha = createStore(false);
+ kaha.setCheckForCorruptJournalFiles(true);
+
+ final AtomicBoolean didSomeRecovery = new AtomicBoolean(false);
+ DefaultTestAppender appender = new DefaultTestAppender() {
+ @Override
+ public void doAppend(LoggingEvent event) {
+ if (event.getLevel() == Level.INFO && event.getRenderedMessage().contains("Recovering from the journal @")) {
+ didSomeRecovery.set(true);
+ }
+ }
+ };
+
+ Logger.getRootLogger().addAppender(appender);
+
+ broker = createBroker(kaha);
+
+ int count = receiveMessages();
+ assertEquals("Expected to received all messages.", count, 100);
+ broker.stop();
+
+ Logger.getRootLogger().addAppender(appender);
+ assertFalse("Did not replay any records from the journal", didSomeRecovery.get());
+ }
+
private void assertExistsAndDelete(File file) {
assertTrue(file.exists());
file.delete();