You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/05/09 19:04:40 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6285
Repository: activemq
Updated Branches:
refs/heads/master 1a8e17fbc -> 60b0c4f85
https://issues.apache.org/jira/browse/AMQ-6285
Properly nulling out the scheduler service in MessageDatabase after it
is shutdown on a store close so that if the store is restarted the thread will
properly restart.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/60b0c4f8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/60b0c4f8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/60b0c4f8
Branch: refs/heads/master
Commit: 60b0c4f85ada06875e09b1bc3fbefac0f9fb6156
Parents: 1a8e17f
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Mon May 9 19:03:38 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Mon May 9 19:03:38 2016 +0000
----------------------------------------------------------------------
.../activemq/store/kahadb/MessageDatabase.java | 95 ++++++++------------
1 file changed, 38 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/60b0c4f8/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index f148971..c1af2fe 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -111,8 +111,6 @@ import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
-
public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
protected BrokerService brokerService;
@@ -471,7 +469,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
checkpointLock.writeLock().unlock();
}
journal.close();
- ThreadPoolUtils.shutdownGraceful(scheduler, -1);
+ synchronized(schedulerLock) {
+ if (scheduler != null) {
+ ThreadPoolUtils.shutdownGraceful(scheduler, -1);
+ scheduler = null;
+ }
+ }
// clear the cache and journalSize on shutdown of the store
storeCache.clear();
journalSize.set(0);
@@ -627,15 +630,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try {
long start = System.currentTimeMillis();
- Location afterProducerAudit = recoverProducerAudit();
- Location afterAckMessageFile = recoverAckMessageFileMap();
+ Location producerAuditPosition = recoverProducerAudit();
+ Location ackMessageFileLocation = recoverAckMessageFileMap();
Location lastIndoubtPosition = getRecoveryPosition();
- if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) {
- // valid checkpoint, possible recover from afterAckMessageFile
- afterProducerAudit = null;
- }
- Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile);
+ Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation);
recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
if (recoveryPosition != null) {
@@ -717,19 +716,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return TransactionIdConversion.convertToLocal(tx);
}
- private Location minimum(Location x,
- Location y) {
+ private Location minimum(Location producerAuditPosition,
+ Location lastIndoubtPosition) {
Location min = null;
- if (x != null) {
- min = x;
- if (y != null) {
- int compare = y.compareTo(x);
- if (compare < 0) {
- min = y;
- }
+ if (producerAuditPosition != null) {
+ min = producerAuditPosition;
+ if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
+ min = lastIndoubtPosition;
}
} else {
- min = y;
+ min = lastIndoubtPosition;
}
return min;
}
@@ -744,7 +740,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
- return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation);
+ return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
} catch (Exception e) {
LOG.warn("Cannot recover message audit", e);
return journal.getNextLocation(null);
@@ -762,7 +758,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try {
ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
- return getNextInitializedLocation(metadata.ackMessageFileMapLocation);
+ return journal.getNextLocation(metadata.ackMessageFileMapLocation);
} catch (Exception e) {
LOG.warn("Cannot recover ackMessageFileMap", e);
return journal.getNextLocation(null);
@@ -990,23 +986,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Perhaps there were no transactions...
if( metadata.lastUpdate!=null) {
// Start replay at the record after the last one recorded in the index file.
- return getNextInitializedLocation(metadata.lastUpdate);
+ return journal.getNextLocation(metadata.lastUpdate);
}
}
// This loads the first position.
return journal.getNextLocation(null);
}
- private Location getNextInitializedLocation(Location location) throws IOException {
- Location mayNotBeInitialized = journal.getNextLocation(location);
- if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) {
- // need to init size and type to skip
- return journal.getNextLocation(mayNotBeInitialized);
- } else {
- return mayNotBeInitialized;
- }
- }
-
protected void checkpointCleanup(final boolean cleanup) throws IOException {
long start;
this.indexLock.writeLock().lock();
@@ -1879,37 +1865,32 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
@Override
public void run() {
-
- int journalToAdvance = -1;
- Set<Integer> journalLogsReferenced = new HashSet<Integer>();
-
// Lock index to capture the ackMessageFileMap data
indexLock.writeLock().lock();
- try {
- // Map keys might not be sorted, find the earliest log file to forward acks
- // from and move only those, future cycles can chip away at more as needed.
- // We won't move files that are themselves rewritten on a previous compaction.
- List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
- Collections.sort(journalFileIds);
- for (Integer journalFileId : journalFileIds) {
- DataFile current = journal.getDataFileById(journalFileId);
- if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
- journalToAdvance = journalFileId;
- break;
- }
+ // Map keys might not be sorted, find the earliest log file to forward acks
+ // from and move only those, future cycles can chip away at more as needed.
+ // We won't move files that are themselves rewritten on a previous compaction.
+ List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
+ Collections.sort(journalFileIds);
+ int journalToAdvance = -1;
+ for (Integer journalFileId : journalFileIds) {
+ DataFile current = journal.getDataFileById(journalFileId);
+ if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
+ journalToAdvance = journalFileId;
+ break;
}
+ }
- // Check if we found one, or if we only found the current file being written to.
- if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
- return;
- }
+ // Check if we found one, or if we only found the current file being written to.
+ if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
+ return;
+ }
- journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
+ Set<Integer> journalLogsReferenced =
+ new HashSet<Integer>(metadata.ackMessageFileMap.get(journalToAdvance));
- } finally {
- indexLock.writeLock().unlock();
- }
+ indexLock.writeLock().unlock();
try {
// Background rewrite of the old acks