You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/07/06 17:44:34 UTC
svn commit: r1143470 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java
Author: tabish
Date: Wed Jul 6 15:44:33 2011
New Revision: 1143470
URL: http://svn.apache.org/viewvc?rev=1143470&view=rev
Log:
Test case for https://issues.apache.org/jira/browse/AMQ-3342 and applied patch with a few mods.
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1143470&r1=1143469&r2=1143470&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Wed Jul 6 15:44:33 2011
@@ -85,8 +85,8 @@ import org.apache.kahadb.util.StringMars
import org.apache.kahadb.util.VariableMarshaller;
public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
-
- protected BrokerService brokerService;
+
+ protected BrokerService brokerService;
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "0"));
@@ -161,7 +161,7 @@ public class MessageDatabase extends Ser
} else {
os.writeBoolean(false);
}
-
+
if (producerSequenceIdTrackerLocation != null) {
os.writeBoolean(true);
LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
@@ -185,8 +185,8 @@ public class MessageDatabase extends Ser
}
protected PageFile pageFile;
- protected Journal journal;
- protected Metadata metadata = new Metadata();
+ protected Journal journal;
+ protected Metadata metadata = new Metadata();
protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
@@ -205,8 +205,8 @@ public class MessageDatabase extends Ser
int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
boolean enableIndexWriteAsync = false;
int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
-
-
+
+
protected AtomicBoolean opened = new AtomicBoolean();
private LockFile lockFile;
private boolean ignoreMissingJournalfiles = false;
@@ -230,10 +230,10 @@ public class MessageDatabase extends Ser
unload();
}
- private void loadPageFile() throws IOException {
- this.indexLock.writeLock().lock();
- try {
- final PageFile pageFile = getPageFile();
+ private void loadPageFile() throws IOException {
+ this.indexLock.writeLock().lock();
+ try {
+ final PageFile pageFile = getPageFile();
pageFile.load();
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
@@ -269,13 +269,13 @@ public class MessageDatabase extends Ser
}
}
});
- pageFile.flush();
+ pageFile.flush();
}finally {
this.indexLock.writeLock().unlock();
}
- }
-
- private void startCheckpoint() {
+ }
+
+ private void startCheckpoint() {
synchronized (checkpointThreadLock) {
boolean start = false;
if (checkpointThread == null) {
@@ -319,36 +319,47 @@ public class MessageDatabase extends Ser
checkpointThread.start();
}
}
- }
+ }
- public void open() throws IOException {
- if( opened.compareAndSet(false, true) ) {
+ public void open() throws IOException {
+ if( opened.compareAndSet(false, true) ) {
getJournal().start();
- loadPageFile();
- startCheckpoint();
+ loadPageFile();
+ startCheckpoint();
recover();
- }
- }
+ }
+ }
private void lock() throws IOException {
- if( lockFile == null ) {
+
+ if (lockFile == null) {
File lockFileName = new File(directory, "lock");
lockFile = new LockFile(lockFileName, true);
if (failIfDatabaseIsLocked) {
lockFile.lock();
} else {
- while (true) {
+ boolean locked = false;
+ while ((!isStopped()) && (!isStopping())) {
try {
lockFile.lock();
+ locked = true;
break;
} catch (IOException e) {
- LOG.info("Database "+lockFileName+" is locked... waiting " + (getDatabaseLockedWaitDelay() / 1000) + " seconds for the database to be unlocked. Reason: " + e);
+ LOG.info("Database "
+ + lockFileName
+ + " is locked... waiting "
+ + (getDatabaseLockedWaitDelay() / 1000)
+ + " seconds for the database to be unlocked. Reason: "
+ + e);
try {
Thread.sleep(getDatabaseLockedWaitDelay());
} catch (InterruptedException e1) {
}
}
}
+ if (!locked) {
+ throw new IOException("attempt to obtain lock aborted due to shutdown");
+ }
}
}
}
@@ -359,7 +370,7 @@ public class MessageDatabase extends Ser
}
public void load() throws IOException {
-
+
this.indexLock.writeLock().lock();
try {
lock();
@@ -373,45 +384,45 @@ public class MessageDatabase extends Ser
deleteAllMessages = false;
}
- open();
- store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
+ open();
+ store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
}finally {
this.indexLock.writeLock().unlock();
}
}
-
- public void close() throws IOException, InterruptedException {
- if( opened.compareAndSet(true, false)) {
- this.indexLock.writeLock().lock();
- try {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- checkpointUpdate(tx, true);
- }
- });
- pageFile.unload();
- metadata = new Metadata();
- }finally {
- this.indexLock.writeLock().unlock();
- }
- journal.close();
+
+ public void close() throws IOException, InterruptedException {
+ if( opened.compareAndSet(true, false)) {
+ this.indexLock.writeLock().lock();
+ try {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ checkpointUpdate(tx, true);
+ }
+ });
+ pageFile.unload();
+ metadata = new Metadata();
+ }finally {
+ this.indexLock.writeLock().unlock();
+ }
+ journal.close();
synchronized (checkpointThreadLock) {
- checkpointThread.join();
+ checkpointThread.join();
}
- lockFile.unlock();
- lockFile=null;
- }
- }
-
+ lockFile.unlock();
+ lockFile=null;
+ }
+ }
+
public void unload() throws IOException, InterruptedException {
this.indexLock.writeLock().lock();
try {
if( pageFile != null && pageFile.isLoaded() ) {
metadata.state = CLOSED_STATE;
metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
-
+
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
tx.store(metadata.page, metadataMarshaller, true);
@@ -444,7 +455,7 @@ public class MessageDatabase extends Ser
/**
* Move all the messages that were in the journal into long term storage. We
* just replay and do a checkpoint.
- *
+ *
* @throws IOException
* @throws IOException
* @throws IllegalStateException
@@ -452,28 +463,28 @@ public class MessageDatabase extends Ser
private void recover() throws IllegalStateException, IOException {
this.indexLock.writeLock().lock();
try {
-
- long start = System.currentTimeMillis();
- Location producerAuditPosition = recoverProducerAudit();
- Location lastIndoubtPosition = getRecoveryPosition();
-
- Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
-
- if (recoveryPosition != null) {
- int redoCounter = 0;
- LOG.info("Recovering from the journal ...");
- while (recoveryPosition != null) {
- JournalCommand<?> message = load(recoveryPosition);
- metadata.lastUpdate = recoveryPosition;
- process(message, recoveryPosition, lastIndoubtPosition);
- redoCounter++;
- recoveryPosition = journal.getNextLocation(recoveryPosition);
- }
- long end = System.currentTimeMillis();
- LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
- }
-
- // We may have to undo some index updates.
+
+ long start = System.currentTimeMillis();
+ Location producerAuditPosition = recoverProducerAudit();
+ Location lastIndoubtPosition = getRecoveryPosition();
+
+ Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
+
+ if (recoveryPosition != null) {
+ int redoCounter = 0;
+ LOG.info("Recovering from the journal ...");
+ while (recoveryPosition != null) {
+ JournalCommand<?> message = load(recoveryPosition);
+ metadata.lastUpdate = recoveryPosition;
+ process(message, recoveryPosition, lastIndoubtPosition);
+ redoCounter++;
+ recoveryPosition = journal.getNextLocation(recoveryPosition);
+ }
+ long end = System.currentTimeMillis();
+ LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
+ }
+
+ // We may have to undo some index updates.
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
recoverIndex(tx);
@@ -498,59 +509,59 @@ public class MessageDatabase extends Ser
this.indexLock.writeLock().unlock();
}
}
-
- private Location minimum(Location producerAuditPosition,
+
+ private Location minimum(Location producerAuditPosition,
Location lastIndoubtPosition) {
- Location min = null;
- if (producerAuditPosition != null) {
- min = producerAuditPosition;
- if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
- min = lastIndoubtPosition;
- }
- } else {
- min = lastIndoubtPosition;
- }
- return min;
- }
-
- private Location recoverProducerAudit() throws IOException {
- if (metadata.producerSequenceIdTrackerLocation != null) {
- KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
- try {
- ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
- metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
- } catch (ClassNotFoundException cfe) {
- IOException ioe = new IOException("Failed to read producerAudit: " + cfe);
- ioe.initCause(cfe);
- throw ioe;
- }
- return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
- } else {
- // got no audit stored so got to recreate via replay from start of the journal
- return journal.getNextLocation(null);
- }
+ Location min = null;
+ if (producerAuditPosition != null) {
+ min = producerAuditPosition;
+ if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
+ min = lastIndoubtPosition;
+ }
+ } else {
+ min = lastIndoubtPosition;
+ }
+ return min;
+ }
+
+ private Location recoverProducerAudit() throws IOException {
+ if (metadata.producerSequenceIdTrackerLocation != null) {
+ KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
+ try {
+ ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
+ metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
+ } catch (ClassNotFoundException cfe) {
+ IOException ioe = new IOException("Failed to read producerAudit: " + cfe);
+ ioe.initCause(cfe);
+ throw ioe;
+ }
+ return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
+ } else {
+ // got no audit stored so got to recreate via replay from start of the journal
+ return journal.getNextLocation(null);
+ }
}
protected void recoverIndex(Transaction tx) throws IOException {
long start = System.currentTimeMillis();
- // It is possible index updates got applied before the journal updates..
+ // It is possible index updates got applied before the journal updates..
// in that case we need to removed references to messages that are not in the journal
final Location lastAppendLocation = journal.getLastAppendLocation();
long undoCounter=0;
-
+
// Go through all the destinations to see if they have messages past the lastAppendLocation
for (StoredDestination sd : storedDestinations.values()) {
-
+
final ArrayList<Long> matches = new ArrayList<Long>();
// Find all the Locations that are >= than the last Append Location.
sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
- @Override
- protected void matched(Location key, Long value) {
- matches.add(value);
- }
+ @Override
+ protected void matched(Location key, Long value) {
+ matches.add(value);
+ }
});
-
-
+
+
for (Long sequenceId : matches) {
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
sd.locationIndex.remove(tx, keys.location);
@@ -558,14 +569,14 @@ public class MessageDatabase extends Ser
metadata.producerSequenceIdTracker.rollback(new MessageId(keys.messageId));
undoCounter++;
// TODO: do we need to modify the ack positions for the pub sub case?
- }
+ }
}
long end = System.currentTimeMillis();
if( undoCounter > 0 ) {
- // The rolledback operations are basically in flight journal writes. To avoid getting these the end user
- // should do sync writes to the journal.
- LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
+ // The rolledback operations are basically in flight journal writes. To avoid getting these the end user
+ // should do sync writes to the journal.
+ LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
}
undoCounter = 0;
@@ -662,44 +673,44 @@ public class MessageDatabase extends Ser
}
}
}
-
+
end = System.currentTimeMillis();
if( undoCounter > 0 ) {
- // The rolledback operations are basically in flight journal writes. To avoid getting these the end user
- // should do sync writes to the journal.
- LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
+ // The rolledback operations are basically in flight journal writes. To avoid getting these the end user
+ // should do sync writes to the journal.
+ LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
}
- }
+ }
- private Location nextRecoveryPosition;
- private Location lastRecoveryPosition;
+ private Location nextRecoveryPosition;
+ private Location lastRecoveryPosition;
- public void incrementalRecover() throws IOException {
- this.indexLock.writeLock().lock();
+ public void incrementalRecover() throws IOException {
+ this.indexLock.writeLock().lock();
try {
- if( nextRecoveryPosition == null ) {
- if( lastRecoveryPosition==null ) {
- nextRecoveryPosition = getRecoveryPosition();
- } else {
- nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
- }
- }
- while (nextRecoveryPosition != null) {
- lastRecoveryPosition = nextRecoveryPosition;
- metadata.lastUpdate = lastRecoveryPosition;
- JournalCommand<?> message = load(lastRecoveryPosition);
- process(message, lastRecoveryPosition);
- nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
- }
+ if( nextRecoveryPosition == null ) {
+ if( lastRecoveryPosition==null ) {
+ nextRecoveryPosition = getRecoveryPosition();
+ } else {
+ nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
+ }
+ }
+ while (nextRecoveryPosition != null) {
+ lastRecoveryPosition = nextRecoveryPosition;
+ metadata.lastUpdate = lastRecoveryPosition;
+ JournalCommand<?> message = load(lastRecoveryPosition);
+ process(message, lastRecoveryPosition);
+ nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
+ }
}finally {
this.indexLock.writeLock().unlock();
}
- }
-
+ }
+
public Location getLastUpdatePosition() throws IOException {
return metadata.lastUpdate;
}
-
+
private Location getRecoveryPosition() throws IOException {
if (!this.forceRecoverIndex) {
@@ -708,7 +719,7 @@ public class MessageDatabase extends Ser
if (metadata.firstInProgressTransactionLocation != null) {
return metadata.firstInProgressTransactionLocation;
}
-
+
// Perhaps there were no transactions...
if( metadata.lastUpdate!=null) {
// Start replay at the record after the last one recorded in the index file.
@@ -717,16 +728,16 @@ public class MessageDatabase extends Ser
}
// This loads the first position.
return journal.getNextLocation(null);
- }
+ }
protected void checkpointCleanup(final boolean cleanup) throws IOException {
- long start;
- this.indexLock.writeLock().lock();
+ long start;
+ this.indexLock.writeLock().lock();
try {
start = System.currentTimeMillis();
- if( !opened.get() ) {
- return;
- }
+ if( !opened.get() ) {
+ return;
+ }
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, cleanup);
@@ -735,15 +746,15 @@ public class MessageDatabase extends Ser
}finally {
this.indexLock.writeLock().unlock();
}
- long end = System.currentTimeMillis();
- if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
- LOG.info("Slow KahaDB access: cleanup took "+(end-start));
- }
+ long end = System.currentTimeMillis();
+ if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+ LOG.info("Slow KahaDB access: cleanup took "+(end-start));
+ }
}
-
- public void checkpoint(Callback closure) throws Exception {
- this.indexLock.writeLock().lock();
+
+ public void checkpoint(Callback closure) throws Exception {
+ this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
@@ -754,7 +765,7 @@ public class MessageDatabase extends Ser
}finally {
this.indexLock.writeLock().unlock();
}
- }
+ }
// /////////////////////////////////////////////////////////////////
// Methods call by the broker to update and query the store.
@@ -770,27 +781,27 @@ public class MessageDatabase extends Ser
* during a recovery process.
*/
public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
- if (before != null) {
- before.run();
- }
+ if (before != null) {
+ before.run();
+ }
try {
int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
os.writeByte(data.type().getNumber());
data.writeFramed(os);
-
+
long start = System.currentTimeMillis();
Location location = journal.write(os.toByteSequence(), sync);
long start2 = System.currentTimeMillis();
process(data, location);
- long end = System.currentTimeMillis();
- if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
- LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
- }
-
- this.indexLock.writeLock().lock();
+ long end = System.currentTimeMillis();
+ if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+ LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+ }
+
+ this.indexLock.writeLock().lock();
try {
- metadata.lastUpdate = location;
+ metadata.lastUpdate = location;
}finally {
this.indexLock.writeLock().unlock();
}
@@ -801,16 +812,16 @@ public class MessageDatabase extends Ser
after.run();
}
return location;
- } catch (IOException ioe) {
+ } catch (IOException ioe) {
LOG.error("KahaDB failed to store to Journal", ioe);
brokerService.handleIOException(ioe);
- throw ioe;
- }
+ throw ioe;
+ }
}
/**
* Loads a previously stored JournalMessage
- *
+ *
* @param location
* @return
* @throws IOException
@@ -832,7 +843,7 @@ public class MessageDatabase extends Ser
message.mergeFramed(is);
return message;
}
-
+
/**
* do minimal recovery till we reach the last inDoubtLocation
* @param data
@@ -1014,7 +1025,7 @@ public class MessageDatabase extends Ser
// /////////////////////////////////////////////////////////////////
protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
- private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
+ private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
@@ -1062,7 +1073,7 @@ public class MessageDatabase extends Ser
void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
if (!command.hasSubscriptionKey()) {
-
+
// In the queue case we just remove the message from the index..
Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
if (sequenceId != null) {
@@ -1070,7 +1081,7 @@ public class MessageDatabase extends Ser
if (keys != null) {
sd.locationIndex.remove(tx, keys.location);
recordAckMessageReferenceLocation(ackLocation, keys.location);
- }
+ }
}
} else {
// In the topic case we need remove the message once it's been acked
@@ -1110,7 +1121,7 @@ public class MessageDatabase extends Ser
void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
sd.orderIndex.remove(tx);
-
+
sd.locationIndex.clear(tx);
sd.locationIndex.unload(tx);
tx.free(sd.locationIndex.getPageId());
@@ -1159,7 +1170,7 @@ public class MessageDatabase extends Ser
removeAckLocationsForSub(tx, sd, subscriptionKey);
}
}
-
+
/**
* @param tx
* @throws IOException
@@ -1183,10 +1194,10 @@ public class MessageDatabase extends Ser
LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
- // Don't GC files under replication
- if( journalFilesBeingReplicated!=null ) {
- gcCandidateSet.removeAll(journalFilesBeingReplicated);
- }
+ // Don't GC files under replication
+ if( journalFilesBeingReplicated!=null ) {
+ gcCandidateSet.removeAll(journalFilesBeingReplicated);
+ }
// Don't GC files after the first in progress tx
if( metadata.firstInProgressTransactionLocation!=null ) {
@@ -1194,58 +1205,58 @@ public class MessageDatabase extends Ser
firstTxLocation = metadata.firstInProgressTransactionLocation;
};
}
-
+
if( firstTxLocation!=null ) {
- while( !gcCandidateSet.isEmpty() ) {
- Integer last = gcCandidateSet.last();
- if( last >= firstTxLocation.getDataFileId() ) {
- gcCandidateSet.remove(last);
- } else {
- break;
- }
- }
+ while( !gcCandidateSet.isEmpty() ) {
+ Integer last = gcCandidateSet.last();
+ if( last >= firstTxLocation.getDataFileId() ) {
+ gcCandidateSet.remove(last);
+ } else {
+ break;
+ }
+ }
LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet);
}
// Go through all the destinations to see if any of them can remove GC candidates.
for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
- if( gcCandidateSet.isEmpty() ) {
- break;
+ if( gcCandidateSet.isEmpty() ) {
+ break;
}
// Use a visitor to cut down the number of pages that we load
entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
int last=-1;
public boolean isInterestedInKeysBetween(Location first, Location second) {
- if( first==null ) {
- SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
- if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
- subset.remove(second.getDataFileId());
- }
- return !subset.isEmpty();
- } else if( second==null ) {
- SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
- if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
- subset.remove(first.getDataFileId());
- }
- return !subset.isEmpty();
- } else {
- SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
- if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
- subset.remove(first.getDataFileId());
- }
- if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
- subset.remove(second.getDataFileId());
- }
- return !subset.isEmpty();
- }
+ if( first==null ) {
+ SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
+ if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
+ subset.remove(second.getDataFileId());
+ }
+ return !subset.isEmpty();
+ } else if( second==null ) {
+ SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
+ if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
+ subset.remove(first.getDataFileId());
+ }
+ return !subset.isEmpty();
+ } else {
+ SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
+ if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
+ subset.remove(first.getDataFileId());
+ }
+ if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
+ subset.remove(second.getDataFileId());
+ }
+ return !subset.isEmpty();
+ }
}
public void visit(List<Location> keys, List<Long> values) {
- for (Location l : keys) {
+ for (Location l : keys) {
int fileId = l.getDataFileId();
- if( last != fileId ) {
- gcCandidateSet.remove(fileId);
+ if( last != fileId ) {
+ gcCandidateSet.remove(fileId);
last = fileId;
}
}
@@ -1279,14 +1290,14 @@ public class MessageDatabase extends Ser
}
if( !gcCandidateSet.isEmpty() ) {
- LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
- journal.removeDataFiles(gcCandidateSet);
+ LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
+ journal.removeDataFiles(gcCandidateSet);
}
}
-
+
LOG.debug("Checkpoint done.");
}
-
+
private Location checkpointProducerAudit() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oout = new ObjectOutputStream(baos);
@@ -1297,15 +1308,15 @@ public class MessageDatabase extends Ser
}
public HashSet<Integer> getJournalFilesBeingReplicated() {
- return journalFilesBeingReplicated;
- }
+ return journalFilesBeingReplicated;
+ }
// /////////////////////////////////////////////////////////////////
// StoredDestination related implementation methods.
// /////////////////////////////////////////////////////////////////
- private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
+ private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
class StoredSubscription {
SubscriptionInfo subscriptionInfo;
@@ -1313,25 +1324,25 @@ public class MessageDatabase extends Ser
Location lastAckLocation;
Location cursor;
}
-
+
static class MessageKeys {
final String messageId;
final Location location;
-
+
public MessageKeys(String messageId, Location location) {
this.messageId=messageId;
this.location=location;
}
-
+
@Override
public String toString() {
return "["+messageId+","+location+"]";
}
}
-
+
static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
-
+
public MessageKeys readPayload(DataInput dataIn) throws IOException {
return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
}
@@ -1371,7 +1382,7 @@ public class MessageDatabase extends Ser
}
protected class LastAckMarshaller implements Marshaller<LastAck> {
-
+
public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
dataOut.writeLong(object.lastAckedSequence);
dataOut.writeByte(object.priority);
@@ -1400,7 +1411,7 @@ public class MessageDatabase extends Ser
}
class StoredDestination {
-
+
MessageOrderIndex orderIndex = new MessageOrderIndex();
BTreeIndex<Location, Long> locationIndex;
BTreeIndex<String, Long> messageIdIndex;
@@ -1553,7 +1564,7 @@ public class MessageDatabase extends Ser
rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
rc.messageIdIndex.load(tx);
-
+
// If it was a topic...
if (topic) {
@@ -1581,11 +1592,11 @@ public class MessageDatabase extends Ser
Long sequence = orderIterator.next().getKey();
addAckLocation(tx, rc, sequence, entry.getKey());
}
- // modify so it is upgraded
+ // modify so it is upgraded
rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
}
}
-
+
if (rc.orderIndex.nextMessageId == 0) {
// check for existing durable sub all acked out - pull next seq from acks as messages are gone
if (!rc.subscriptionAcks.isEmpty(tx)) {
@@ -1609,7 +1620,7 @@ public class MessageDatabase extends Ser
if (metadata.version < 3) {
// store again after upgrade
metadata.destinations.put(tx, key, rc);
- }
+ }
return rc;
}
@@ -1838,7 +1849,7 @@ public class MessageDatabase extends Ser
public int getJournalMaxWriteBatchSize() {
return journalMaxWriteBatchSize;
}
-
+
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
}
@@ -1858,7 +1869,7 @@ public class MessageDatabase extends Ser
public void setDeleteAllMessages(boolean deleteAllMessages) {
this.deleteAllMessages = deleteAllMessages;
}
-
+
public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
this.setIndexWriteBatchSize = setIndexWriteBatchSize;
}
@@ -1866,15 +1877,15 @@ public class MessageDatabase extends Ser
public int getIndexWriteBatchSize() {
return setIndexWriteBatchSize;
}
-
+
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
this.enableIndexWriteAsync = enableIndexWriteAsync;
}
-
+
boolean isEnableIndexWriteAsync() {
return enableIndexWriteAsync;
}
-
+
public boolean isEnableJournalDiskSyncs() {
return enableJournalDiskSyncs;
}
@@ -1902,40 +1913,40 @@ public class MessageDatabase extends Ser
public void setJournalMaxFileLength(int journalMaxFileLength) {
this.journalMaxFileLength = journalMaxFileLength;
}
-
+
public int getJournalMaxFileLength() {
return journalMaxFileLength;
}
-
+
public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
}
-
+
public int getMaxFailoverProducersToTrack() {
return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
}
-
+
public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
}
-
+
public int getFailoverProducersAuditDepth() {
return this.metadata.producerSequenceIdTracker.getAuditDepth();
}
-
+
public PageFile getPageFile() {
if (pageFile == null) {
pageFile = createPageFile();
}
- return pageFile;
- }
+ return pageFile;
+ }
- public Journal getJournal() throws IOException {
+ public Journal getJournal() throws IOException {
if (journal == null) {
journal = createJournal();
}
- return journal;
- }
+ return journal;
+ }
public boolean isFailIfDatabaseIsLocked() {
return failIfDatabaseIsLocked;
@@ -1948,7 +1959,7 @@ public class MessageDatabase extends Ser
public boolean isIgnoreMissingJournalfiles() {
return ignoreMissingJournalfiles;
}
-
+
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
}
@@ -1977,9 +1988,9 @@ public class MessageDatabase extends Ser
this.checksumJournalFiles = checksumJournalFiles;
}
- public void setBrokerService(BrokerService brokerService) {
- this.brokerService = brokerService;
- }
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
/**
* @return the archiveDataLogs
@@ -2056,29 +2067,29 @@ public class MessageDatabase extends Ser
long highPriorityCursorPosition;
MessageOrderCursor(){
}
-
+
MessageOrderCursor(long position){
this.defaultCursorPosition=position;
this.lowPriorityCursorPosition=position;
this.highPriorityCursorPosition=position;
}
-
+
MessageOrderCursor(MessageOrderCursor other){
this.defaultCursorPosition=other.defaultCursorPosition;
this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
this.highPriorityCursorPosition=other.highPriorityCursorPosition;
}
-
+
MessageOrderCursor copy() {
return new MessageOrderCursor(this);
}
-
+
void reset() {
this.defaultCursorPosition=0;
this.highPriorityCursorPosition=0;
this.lowPriorityCursorPosition=0;
}
-
+
void increment() {
if (defaultCursorPosition!=0) {
defaultCursorPosition++;
@@ -2103,7 +2114,7 @@ public class MessageDatabase extends Ser
this.highPriorityCursorPosition=other.highPriorityCursorPosition;
}
}
-
+
class MessageOrderIndex {
static final byte HI = 9;
static final byte LO = 0;
@@ -2129,7 +2140,7 @@ public class MessageDatabase extends Ser
}
return result;
}
-
+
void load(Transaction tx) throws IOException {
defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
@@ -2141,7 +2152,7 @@ public class MessageDatabase extends Ser
highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
highPriorityIndex.load(tx);
}
-
+
void allocate(Transaction tx) throws IOException {
defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
if (metadata.version >= 2) {
@@ -2149,7 +2160,7 @@ public class MessageDatabase extends Ser
highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
}
}
-
+
void configureLast(Transaction tx) throws IOException {
// Figure out the next key using the last entry in the destination.
if (highPriorityIndex != null) {
@@ -2174,8 +2185,8 @@ public class MessageDatabase extends Ser
}
}
}
-
-
+
+
void remove(Transaction tx) throws IOException {
defaultPriorityIndex.clear(tx);
defaultPriorityIndex.unload(tx);
@@ -2192,14 +2203,14 @@ public class MessageDatabase extends Ser
tx.free(highPriorityIndex.getPageId());
}
}
-
+
void resetCursorPosition() {
this.cursor.reset();
lastDefaultKey = null;
lastHighKey = null;
lastLowKey = null;
}
-
+
void setBatch(Transaction tx, Long sequence) throws IOException {
if (sequence != null) {
Long nextPosition = new Long(sequence.longValue() + 1);
@@ -2243,7 +2254,7 @@ public class MessageDatabase extends Ser
}
}
}
-
+
void stoppedIterating() {
if (lastDefaultKey!=null) {
cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
@@ -2258,7 +2269,7 @@ public class MessageDatabase extends Ser
lastHighKey = null;
lastLowKey = null;
}
-
+
void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
throws IOException {
if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
@@ -2269,18 +2280,18 @@ public class MessageDatabase extends Ser
getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
}
}
-
+
void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId);
deletes.add(iterator.next());
}
-
+
long getNextMessageId(int priority) {
return nextMessageId++;
}
-
+
MessageKeys get(Transaction tx, Long key) throws IOException {
MessageKeys result = defaultPriorityIndex.get(tx, key);
if (result == null) {
@@ -2296,7 +2307,7 @@ public class MessageDatabase extends Ser
}
return result;
}
-
+
MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
return defaultPriorityIndex.put(tx, key, value);
@@ -2306,11 +2317,11 @@ public class MessageDatabase extends Ser
return lowPriorityIndex.put(tx, key, value);
}
}
-
+
Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
return new MessageOrderIterator(tx,cursor);
}
-
+
Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
return new MessageOrderIterator(tx,m);
}
@@ -2324,8 +2335,8 @@ public class MessageDatabase extends Ser
final Iterator<Entry<Long, MessageKeys>>highIterator;
final Iterator<Entry<Long, MessageKeys>>defaultIterator;
final Iterator<Entry<Long, MessageKeys>>lowIterator;
-
-
+
+
MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
@@ -2340,7 +2351,7 @@ public class MessageDatabase extends Ser
this.lowIterator = null;
}
}
-
+
public boolean hasNext() {
if (currentIterator == null) {
if (highIterator != null) {
@@ -2410,10 +2421,10 @@ public class MessageDatabase extends Ser
public void remove() {
throw new UnsupportedOperationException();
}
-
+
}
}
-
+
private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
@@ -2437,10 +2448,10 @@ public class MessageDatabase extends Ser
try {
return (HashSet<String>) oin.readObject();
} catch (ClassNotFoundException cfe) {
- IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
- ioe.initCause(cfe);
- throw ioe;
- }
+ IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
+ ioe.initCause(cfe);
+ throw ioe;
+ }
}
}
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java?rev=1143470&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java Wed Jul 6 15:44:33 2011
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.io.File;
+import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MasterSlaveSlaveShutdownTest extends TestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MasterSlaveSlaveShutdownTest.class);
+
+ BrokerService master;
+ BrokerService slave;
+
+ private void createMasterBroker() throws Exception {
+ final BrokerService master = new BrokerService();
+ master.setBrokerName("master");
+ master.setPersistent(false);
+ master.addConnector("tcp://localhost:0");
+
+ KahaDBStore kaha = new KahaDBStore();
+ kaha.setDirectory(new File("target/activemq-data/kahadb"));
+ kaha.deleteAllMessages();
+ master.setPersistenceAdapter(kaha);
+
+ this.master = master;
+ }
+
+ private void createSlaveBroker() throws Exception {
+
+ final BrokerService slave = new BrokerService();
+ slave.setBrokerName("slave");
+ slave.setPersistent(false);
+ URI masterUri = master.getTransportConnectors().get(0).getConnectUri();
+ slave.setMasterConnectorURI(masterUri.toString());
+ slave.setUseJmx(false);
+ slave.getManagementContext().setCreateConnector(false);
+
+ KahaDBStore kaha = new KahaDBStore();
+ kaha.setDirectory(new File("target/activemq-data/kahadb"));
+ slave.setPersistenceAdapter(kaha);
+
+ this.slave = slave;
+ }
+
+ public void tearDown() {
+ try {
+ this.master.stop();
+ } catch (Exception e) {
+ }
+ this.master.waitUntilStopped();
+ this.master = null;
+ this.slave = null;
+ }
+
+ public void testSlaveShutsdownWhenWaitingForLock() throws Exception {
+
+ createMasterBroker();
+ createSlaveBroker();
+
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ try {
+ master.start();
+ } catch (Exception e) {
+ LOG.warn("Exception starting master: " + e);
+ e.printStackTrace();
+ }
+ }
+ });
+ master.waitUntilStarted();
+
+ Thread.sleep(2000);
+
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ try {
+ slave.start();
+ } catch (Exception e) {
+ LOG.warn("Exception starting master: " + e);
+ e.printStackTrace();
+ }
+ }
+ });
+ slave.waitUntilStarted();
+ Thread.sleep(TimeUnit.SECONDS.toMillis(15));
+
+ LOG.info("killing slave..");
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ try {
+ slave.stop();
+ } catch (Exception e) {
+ LOG.warn("Exception starting master: " + e);
+ e.printStackTrace();
+ }
+ }
+ });
+
+ Thread.sleep(TimeUnit.SECONDS.toMillis(15));
+ assertFalse(slave.isStarted());
+ slave.waitUntilStopped();
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java
------------------------------------------------------------------------------
svn:eol-style = native