You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/05/11 13:17:02 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6288
Repository: activemq
Updated Branches:
refs/heads/master c81a9348e -> e53e34026
https://issues.apache.org/jira/browse/AMQ-6288
The ack compaction task now acquires the checkpoint lock while it runs
to prevent a checkpoint from running at the same time unintentionally.
Also, getJournalLocation is now protected by a try/catch to handle
errors.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e53e3402
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e53e3402
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e53e3402
Branch: refs/heads/master
Commit: e53e340262d5e57a11464c323606529430e9b832
Parents: c81a934
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed May 11 12:26:16 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed May 11 13:16:57 2016 +0000
----------------------------------------------------------------------
.../activemq/store/kahadb/MessageDatabase.java | 109 +++++++++++++------
1 file changed, 73 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/e53e3402/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index a3addcc..21530c2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1888,46 +1888,67 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
int journalToAdvance = -1;
Set<Integer> journalLogsReferenced = new HashSet<Integer>();
- // Lock index to capture the ackMessageFileMap data
- indexLock.writeLock().lock();
-
try {
- // Map keys might not be sorted, find the earliest log file to forward acks
- // from and move only those, future cycles can chip away at more as needed.
- // We won't move files that are themselves rewritten on a previous compaction.
- List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
- Collections.sort(journalFileIds);
- for (Integer journalFileId : journalFileIds) {
- DataFile current = journal.getDataFileById(journalFileId);
- if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
- journalToAdvance = journalFileId;
- break;
- }
- }
+ //acquire the checkpoint lock to prevent other threads from
+ //running a checkpoint while this is running
+ //
+ //Normally this task runs on the same executor as the checkpoint task
+ //so this ack compaction runner wouldn't run at the same time as the checkpoint task.
+ //
+ //However, there are two cases where this isn't always true.
+ //First, the checkpoint() method is public and can be called through the
+ //PersistenceAdapter interface by someone at the same time this is running.
+ //Second, a checkpoint is called during shutdown without using the executor.
+ //
+ //In the future it might be better to just remove the checkpointLock entirely
+ //and only use the executor but this would need to be examined for any unintended
+ //consequences
+ checkpointLock.writeLock().lock();
- // Check if we found one, or if we only found the current file being written to.
- if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
- return;
- }
+ try {
- journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
+ // Lock index to capture the ackMessageFileMap data
+ indexLock.writeLock().lock();
+
+ // Map keys might not be sorted, find the earliest log file to forward acks
+ // from and move only those, future cycles can chip away at more as needed.
+ // We won't move files that are themselves rewritten on a previous compaction.
+ List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
+ Collections.sort(journalFileIds);
+ for (Integer journalFileId : journalFileIds) {
+ DataFile current = journal.getDataFileById(journalFileId);
+ if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
+ journalToAdvance = journalFileId;
+ break;
+ }
+ }
- } finally {
- indexLock.writeLock().unlock();
- }
+ // Check if we found one, or if we only found the current file being written to.
+ if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
+ return;
+ }
- try {
- // Background rewrite of the old acks
- forwardAllAcks(journalToAdvance, journalLogsReferenced);
+ journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
- // Checkpoint with changes from the ackMessageFileMap
- checkpointUpdate(false);
- } catch (IOException ioe) {
- LOG.error("Checkpoint failed", ioe);
- brokerService.handleIOException(ioe);
- } catch (Throwable e) {
- LOG.error("Checkpoint failed", e);
- brokerService.handleIOException(IOExceptionSupport.create(e));
+ } finally {
+ indexLock.writeLock().unlock();
+ }
+
+ try {
+ // Background rewrite of the old acks
+ forwardAllAcks(journalToAdvance, journalLogsReferenced);
+
+ // Checkpoint with changes from the ackMessageFileMap
+ checkpointUpdate(false);
+ } catch (IOException ioe) {
+ LOG.error("Checkpoint failed", ioe);
+ brokerService.handleIOException(ioe);
+ } catch (Throwable e) {
+ LOG.error("Checkpoint failed", e);
+ brokerService.handleIOException(IOExceptionSupport.create(e));
+ }
+ } finally {
+ checkpointLock.writeLock().unlock();
}
}
}
@@ -1949,7 +1970,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs());
LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
- Location nextLocation = journal.getNextLocation(new Location(journalToRead, 0));
+ Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0));
while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) {
JournalCommand<?> command = null;
try {
@@ -1964,7 +1985,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
}
- nextLocation = journal.getNextLocation(nextLocation);
+ nextLocation = getNextLocationForAckForward(nextLocation);
}
}
@@ -1994,6 +2015,22 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
}
+ private Location getNextLocationForAckForward(final Location nextLocation) {
+ //getNextLocation() can throw an IOException, we should handle it and set
+ //nextLocation to null and abort gracefully
+ //Should not happen in the normal case
+ Location location = null;
+ try {
+ location = journal.getNextLocation(nextLocation);
+ } catch (IOException e) {
+ LOG.warn("Failed to load next journal location: {}", e.getMessage());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to load next journal location", e);
+ }
+ }
+ return location;
+ }
+
final Runnable nullCompletionCallback = new Runnable() {
@Override
public void run() {