You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/05/04 21:11:11 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6277 - journal getNextLocation needs too passes to skip past if target is not initialized

Repository: activemq
Updated Branches:
  refs/heads/master a28a091c5 -> 1c4108545


https://issues.apache.org/jira/browse/AMQ-6277 - journal getNextLocation needs too passes to skip past if target is not initialized


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1c410854
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1c410854
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1c410854

Branch: refs/heads/master
Commit: 1c4108545c1cdc7ba6017d11015d8233b1218e0f
Parents: a28a091c
Author: gtully <ga...@gmail.com>
Authored: Wed May 4 22:09:06 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed May 4 22:09:06 2016 +0100

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 36 ++++++++++++++------
 1 file changed, 25 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/1c410854/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 4a23cbc..7252bb9 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -111,6 +111,8 @@ import org.apache.activemq.util.ThreadPoolUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
+
 public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
 
     protected BrokerService brokerService;
@@ -625,12 +627,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         try {
 
             long start = System.currentTimeMillis();
-            Location producerAuditPosition = recoverProducerAudit();
-            Location ackMessageFileLocation = recoverAckMessageFileMap();
+            Location afterProducerAudit = recoverProducerAudit();
+            Location afterAckMessageFile = recoverAckMessageFileMap();
             Location lastIndoubtPosition = getRecoveryPosition();
 
-            Location recoveryPosition = startOfRecovery(producerAuditPosition, ackMessageFileLocation);
-            recoveryPosition = startOfRecovery(recoveryPosition, lastIndoubtPosition);
+            if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) {
+                // valid checkpoint, possible recover from afterAckMessageFile
+                afterProducerAudit = null;
+            }
+            Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile);
+            recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
 
             if (recoveryPosition != null) {
                 int redoCounter = 0;
@@ -711,8 +717,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         return TransactionIdConversion.convertToLocal(tx);
     }
 
-    private Location startOfRecovery(Location x,
-            Location y) {
+    private Location minimum(Location x,
+                             Location y) {
         Location min = null;
         if (x != null) {
             min = x;
@@ -720,8 +726,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 int compare = y.compareTo(x);
                 if (compare < 0) {
                     min = y;
-                } else if (compare == 0) {
-                    min = null; // no recovery needed on a matched location
                 }
             }
         } else {
@@ -740,7 +744,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
                 metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
                 metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
-                return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
+                return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation);
             } catch (Exception e) {
                 LOG.warn("Cannot recover message audit", e);
                 return journal.getNextLocation(null);
@@ -758,7 +762,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             try {
                 ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
                 metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
-                return journal.getNextLocation(metadata.ackMessageFileMapLocation);
+                return getNextInitializedLocation(metadata.ackMessageFileMapLocation);
             } catch (Exception e) {
                 LOG.warn("Cannot recover ackMessageFileMap", e);
                 return journal.getNextLocation(null);
@@ -986,13 +990,23 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             // Perhaps there were no transactions...
             if( metadata.lastUpdate!=null) {
                 // Start replay at the record after the last one recorded in the index file.
-                return journal.getNextLocation(metadata.lastUpdate);
+                return getNextInitializedLocation(metadata.lastUpdate);
             }
         }
         // This loads the first position.
         return journal.getNextLocation(null);
     }
 
+    private Location getNextInitializedLocation(Location location) throws IOException {
+        Location mayNotBeInitialized = journal.getNextLocation(location);
+        if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) {
+            // need to init size and type to skip
+            return journal.getNextLocation(mayNotBeInitialized);
+        } else {
+            return mayNotBeInitialized;
+        }
+    }
+
     protected void checkpointCleanup(final boolean cleanup) throws IOException {
         long start;
         this.indexLock.writeLock().lock();