You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/05/11 13:17:02 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6288

Repository: activemq
Updated Branches:
  refs/heads/master c81a9348e -> e53e34026


https://issues.apache.org/jira/browse/AMQ-6288

The ack compaction task now acquires the checkpoint lock while it runs
to prevent a checkpoint from running at the same time unintentionally.
Also, getJournalLocation is now protected by a try/catch to handle
errors.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e53e3402
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e53e3402
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e53e3402

Branch: refs/heads/master
Commit: e53e340262d5e57a11464c323606529430e9b832
Parents: c81a934
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed May 11 12:26:16 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed May 11 13:16:57 2016 +0000

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 109 +++++++++++++------
 1 file changed, 73 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e53e3402/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index a3addcc..21530c2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1888,46 +1888,67 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             int journalToAdvance = -1;
             Set<Integer> journalLogsReferenced = new HashSet<Integer>();
 
-            // Lock index to capture the ackMessageFileMap data
-            indexLock.writeLock().lock();
-
             try {
-                // Map keys might not be sorted, find the earliest log file to forward acks
-                // from and move only those, future cycles can chip away at more as needed.
-                // We won't move files that are themselves rewritten on a previous compaction.
-                List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
-                Collections.sort(journalFileIds);
-                for (Integer journalFileId : journalFileIds) {
-                    DataFile current = journal.getDataFileById(journalFileId);
-                    if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
-                        journalToAdvance = journalFileId;
-                        break;
-                    }
-                }
+                //acquire the checkpoint lock to prevent other threads from
+                //running a checkpoint while this is running
+                //
+                //Normally this task runs on the same executor as the checkpoint task
+                //so this ack compaction runner wouldn't run at the same time as the checkpoint task.
+                //
+                //However, there are two cases where this isn't always true.
+                //First, the checkpoint() method is public and can be called through the
+                //PersistenceAdapter interface by someone at the same time this is running.
+                //Second, a checkpoint is called during shutdown without using the executor.
+                //
+                //In the future it might be better to just remove the checkpointLock entirely
+                //and only use the executor but this would need to be examined for any unintended
+                //consequences
+                checkpointLock.writeLock().lock();
 
-                // Check if we found one, or if we only found the current file being written to.
-                if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
-                    return;
-                }
+                try {
 
-                journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
+                    // Lock index to capture the ackMessageFileMap data
+                    indexLock.writeLock().lock();
+
+                    // Map keys might not be sorted, find the earliest log file to forward acks
+                    // from and move only those, future cycles can chip away at more as needed.
+                    // We won't move files that are themselves rewritten on a previous compaction.
+                    List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
+                    Collections.sort(journalFileIds);
+                    for (Integer journalFileId : journalFileIds) {
+                        DataFile current = journal.getDataFileById(journalFileId);
+                        if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
+                            journalToAdvance = journalFileId;
+                            break;
+                        }
+                    }
 
-            } finally {
-                indexLock.writeLock().unlock();
-            }
+                    // Check if we found one, or if we only found the current file being written to.
+                    if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
+                        return;
+                    }
 
-            try {
-                // Background rewrite of the old acks
-                forwardAllAcks(journalToAdvance, journalLogsReferenced);
+                    journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
 
-                // Checkpoint with changes from the ackMessageFileMap
-                checkpointUpdate(false);
-            } catch (IOException ioe) {
-                LOG.error("Checkpoint failed", ioe);
-                brokerService.handleIOException(ioe);
-            } catch (Throwable e) {
-                LOG.error("Checkpoint failed", e);
-                brokerService.handleIOException(IOExceptionSupport.create(e));
+                } finally {
+                    indexLock.writeLock().unlock();
+                }
+
+                try {
+                    // Background rewrite of the old acks
+                    forwardAllAcks(journalToAdvance, journalLogsReferenced);
+
+                    // Checkpoint with changes from the ackMessageFileMap
+                    checkpointUpdate(false);
+                } catch (IOException ioe) {
+                    LOG.error("Checkpoint failed", ioe);
+                    brokerService.handleIOException(ioe);
+                } catch (Throwable e) {
+                    LOG.error("Checkpoint failed", e);
+                    brokerService.handleIOException(IOExceptionSupport.create(e));
+                }
+            } finally {
+                checkpointLock.writeLock().unlock();
             }
         }
     }
@@ -1949,7 +1970,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs());
             LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
 
-            Location nextLocation = journal.getNextLocation(new Location(journalToRead, 0));
+            Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0));
             while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) {
                 JournalCommand<?> command = null;
                 try {
@@ -1964,7 +1985,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                     updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
                 }
 
-                nextLocation = journal.getNextLocation(nextLocation);
+                nextLocation = getNextLocationForAckForward(nextLocation);
             }
         }
 
@@ -1994,6 +2015,22 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
     }
 
+    private Location getNextLocationForAckForward(final Location nextLocation) {
+        //getNextLocation() can throw an IOException, we should handle it and set
+        //nextLocation to null and abort gracefully
+        //Should not happen in the normal case
+        Location location = null;
+        try {
+            location = journal.getNextLocation(nextLocation);
+        } catch (IOException e) {
+            LOG.warn("Failed to load next journal location: {}", e.getMessage());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Failed to load next journal location", e);
+            }
+        }
+        return location;
+    }
+
     final Runnable nullCompletionCallback = new Runnable() {
         @Override
         public void run() {