You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/05/09 19:04:40 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6285

Repository: activemq
Updated Branches:
  refs/heads/master 1a8e17fbc -> 60b0c4f85


https://issues.apache.org/jira/browse/AMQ-6285

Properly nulling out the scheduler service in MessageDatabase after it
is shutdown on a store close so that if the store is restarted the thread will
properly restart.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/60b0c4f8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/60b0c4f8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/60b0c4f8

Branch: refs/heads/master
Commit: 60b0c4f85ada06875e09b1bc3fbefac0f9fb6156
Parents: 1a8e17f
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Mon May 9 19:03:38 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Mon May 9 19:03:38 2016 +0000

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 95 ++++++++------------
 1 file changed, 38 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/60b0c4f8/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index f148971..c1af2fe 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -111,8 +111,6 @@ import org.apache.activemq.util.ThreadPoolUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
-
 public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
 
     protected BrokerService brokerService;
@@ -471,7 +469,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 checkpointLock.writeLock().unlock();
             }
             journal.close();
-            ThreadPoolUtils.shutdownGraceful(scheduler, -1);
+            synchronized(schedulerLock) {
+                if (scheduler != null) {
+                    ThreadPoolUtils.shutdownGraceful(scheduler, -1);
+                    scheduler = null;
+                }
+            }
             // clear the cache and journalSize on shutdown of the store
             storeCache.clear();
             journalSize.set(0);
@@ -627,15 +630,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         try {
 
             long start = System.currentTimeMillis();
-            Location afterProducerAudit = recoverProducerAudit();
-            Location afterAckMessageFile = recoverAckMessageFileMap();
+            Location producerAuditPosition = recoverProducerAudit();
+            Location ackMessageFileLocation = recoverAckMessageFileMap();
             Location lastIndoubtPosition = getRecoveryPosition();
 
-            if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) {
-                // valid checkpoint, possible recover from afterAckMessageFile
-                afterProducerAudit = null;
-            }
-            Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile);
+            Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation);
             recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
 
             if (recoveryPosition != null) {
@@ -717,19 +716,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         return TransactionIdConversion.convertToLocal(tx);
     }
 
-    private Location minimum(Location x,
-                             Location y) {
+    private Location minimum(Location producerAuditPosition,
+            Location lastIndoubtPosition) {
         Location min = null;
-        if (x != null) {
-            min = x;
-            if (y != null) {
-                int compare = y.compareTo(x);
-                if (compare < 0) {
-                    min = y;
-                }
+        if (producerAuditPosition != null) {
+            min = producerAuditPosition;
+            if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
+                min = lastIndoubtPosition;
             }
         } else {
-            min = y;
+            min = lastIndoubtPosition;
         }
         return min;
     }
@@ -744,7 +740,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
                 metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
                 metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
-                return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation);
+                return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
             } catch (Exception e) {
                 LOG.warn("Cannot recover message audit", e);
                 return journal.getNextLocation(null);
@@ -762,7 +758,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             try {
                 ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
                 metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
-                return getNextInitializedLocation(metadata.ackMessageFileMapLocation);
+                return journal.getNextLocation(metadata.ackMessageFileMapLocation);
             } catch (Exception e) {
                 LOG.warn("Cannot recover ackMessageFileMap", e);
                 return journal.getNextLocation(null);
@@ -990,23 +986,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             // Perhaps there were no transactions...
             if( metadata.lastUpdate!=null) {
                 // Start replay at the record after the last one recorded in the index file.
-                return getNextInitializedLocation(metadata.lastUpdate);
+                return journal.getNextLocation(metadata.lastUpdate);
             }
         }
         // This loads the first position.
         return journal.getNextLocation(null);
     }
 
-    private Location getNextInitializedLocation(Location location) throws IOException {
-        Location mayNotBeInitialized = journal.getNextLocation(location);
-        if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) {
-            // need to init size and type to skip
-            return journal.getNextLocation(mayNotBeInitialized);
-        } else {
-            return mayNotBeInitialized;
-        }
-    }
-
     protected void checkpointCleanup(final boolean cleanup) throws IOException {
         long start;
         this.indexLock.writeLock().lock();
@@ -1879,37 +1865,32 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
         @Override
         public void run() {
-
-            int journalToAdvance = -1;
-            Set<Integer> journalLogsReferenced = new HashSet<Integer>();
-
             // Lock index to capture the ackMessageFileMap data
             indexLock.writeLock().lock();
 
-            try {
-                // Map keys might not be sorted, find the earliest log file to forward acks
-                // from and move only those, future cycles can chip away at more as needed.
-                // We won't move files that are themselves rewritten on a previous compaction.
-                List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
-                Collections.sort(journalFileIds);
-                for (Integer journalFileId : journalFileIds) {
-                    DataFile current = journal.getDataFileById(journalFileId);
-                    if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
-                        journalToAdvance = journalFileId;
-                        break;
-                    }
+            // Map keys might not be sorted, find the earliest log file to forward acks
+            // from and move only those, future cycles can chip away at more as needed.
+            // We won't move files that are themselves rewritten on a previous compaction.
+            List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
+            Collections.sort(journalFileIds);
+            int journalToAdvance = -1;
+            for (Integer journalFileId : journalFileIds) {
+                DataFile current = journal.getDataFileById(journalFileId);
+                if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
+                    journalToAdvance = journalFileId;
+                    break;
                 }
+            }
 
-                // Check if we found one, or if we only found the current file being written to.
-                if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
-                    return;
-                }
+            // Check if we found one, or if we only found the current file being written to.
+            if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
+                return;
+            }
 
-                journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
+            Set<Integer> journalLogsReferenced =
+                new HashSet<Integer>(metadata.ackMessageFileMap.get(journalToAdvance));
 
-            } finally {
-                indexLock.writeLock().unlock();
-            }
+            indexLock.writeLock().unlock();
 
             try {
                 // Background rewrite of the old acks