You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/04/15 20:08:16 UTC
svn commit: r1468171 - in /activemq/trunk:
activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/
activemq-kahadb-store/src/main/proto/
activemq-unit-tests/src/test/java/org/apache/activemq/bugs/
activemq-unit-tests/src/test/java/org/apa...
Author: tabish
Date: Mon Apr 15 18:08:15 2013
New Revision: 1468171
URL: http://svn.apache.org/r1468171
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4212
fix for: https://issues.apache.org/jira/browse/AMQ-2832
Added:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java (with props)
Modified:
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
activemq/trunk/activemq-kahadb-store/src/main/proto/journal-data.proto
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java
Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1468171&r1=1468170&r2=1468171&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Mon Apr 15 18:08:15 2013
@@ -54,9 +54,9 @@ import org.apache.activemq.ActiveMQMessa
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
@@ -85,7 +85,12 @@ import org.apache.activemq.store.kahadb.
import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-import org.apache.activemq.util.*;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,13 +106,12 @@ public abstract class MessageDatabase ex
UNMATCHED = new Buffer(new byte[]{});
}
private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
- private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
static final int CLOSED_STATE = 1;
static final int OPEN_STATE = 2;
static final long NOT_ACKED = -1;
- static final int VERSION = 4;
+ static final int VERSION = 5;
protected class Metadata {
protected Page<Metadata> page;
@@ -116,7 +120,9 @@ public abstract class MessageDatabase ex
protected Location lastUpdate;
protected Location firstInProgressTransactionLocation;
protected Location producerSequenceIdTrackerLocation = null;
+ protected Location ackMessageFileMapLocation = null;
protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
+ protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
protected int version = VERSION;
public void read(DataInput is) throws IOException {
state = is.readInt();
@@ -144,6 +150,11 @@ public abstract class MessageDatabase ex
} catch (EOFException expectedOnUpgrade) {
version=1;
}
+ if (version >= 5 && is.readBoolean()) {
+ ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is);
+ } else {
+ ackMessageFileMapLocation = null;
+ }
LOG.info("KahaDB is version " + version);
}
@@ -172,6 +183,12 @@ public abstract class MessageDatabase ex
os.writeBoolean(false);
}
os.writeInt(VERSION);
+ if (ackMessageFileMapLocation != null) {
+ os.writeBoolean(true);
+ LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os);
+ } else {
+ os.writeBoolean(false);
+ }
}
}
@@ -452,6 +469,7 @@ public abstract class MessageDatabase ex
return range;
}
+ @SuppressWarnings("rawtypes")
private void trackMaxAndMin(Location[] range, List<Operation> ops) {
Location t = ops.get(0).getLocation();
if (range[0]==null || t.compareTo(range[0]) <= 0) {
@@ -473,6 +491,7 @@ public abstract class MessageDatabase ex
}
HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
+ @SuppressWarnings("rawtypes")
public void track(Operation operation) {
if (location == null ) {
location = operation.getLocation();
@@ -543,9 +562,11 @@ public abstract class MessageDatabase ex
long start = System.currentTimeMillis();
Location producerAuditPosition = recoverProducerAudit();
+ Location ackMessageFileLocation = recoverAckMessageFileMap();
Location lastIndoubtPosition = getRecoveryPosition();
- Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
+ Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation);
+ recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
if (recoveryPosition != null) {
int redoCounter = 0;
@@ -631,6 +652,24 @@ public abstract class MessageDatabase ex
}
}
+ @SuppressWarnings("unchecked")
+ private Location recoverAckMessageFileMap() throws IOException {
+ if (metadata.ackMessageFileMapLocation != null) {
+ KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
+ try {
+ ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
+ metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
+ return journal.getNextLocation(metadata.ackMessageFileMapLocation);
+ } catch (Exception e) {
+ LOG.warn("Cannot recover ackMessageFileMap", e);
+ return journal.getNextLocation(null);
+ }
+ } else {
+ // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
+ return journal.getNextLocation(null);
+ }
+ }
+
protected void recoverIndex(Transaction tx) throws IOException {
long start = System.currentTimeMillis();
// It is possible index updates got applied before the journal updates..
@@ -760,7 +799,6 @@ public abstract class MessageDatabase ex
undoCounter++;
// TODO: do we need to modify the ack positions for the pub sub case?
}
-
} else {
throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
}
@@ -947,6 +985,9 @@ public abstract class MessageDatabase ex
byte readByte = is.readByte();
KahaEntryType type = KahaEntryType.valueOf(readByte);
if( type == null ) {
+ try {
+ is.close();
+ } catch (IOException e) {}
throw new IOException("Could not load journal record. Invalid location: "+location);
}
JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
@@ -1024,6 +1065,11 @@ public abstract class MessageDatabase ex
}
@Override
+ public void visit(KahaAckMessageFileMapCommand command) throws IOException {
+ processLocation(location);
+ }
+
+ @Override
public void visit(KahaTraceCommand command) {
processLocation(location);
}
@@ -1223,20 +1269,16 @@ public abstract class MessageDatabase ex
}
} else {
// If the message ID as indexed, then the broker asked us to
- // store a DUP
- // message. Bad BOY! Don't do it, and log a warning.
+ // store a DUP message. Bad BOY! Don't do it, and log a warning.
LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
sd.messageIdIndex.put(tx, command.getMessageId(), previous);
sd.locationIndex.remove(tx, location);
rollbackStatsOnDuplicate(command.getDestination());
}
} else {
- // restore the previous value.. Looks like this was a redo of a
- // previously
- // added message. We don't want to assign it a new id as the other
- // indexes would
+ // restore the previous value.. Looks like this was a redo of a previously
+ // added message. We don't want to assign it a new id as the other indexes would
// be wrong..
- //
sd.locationIndex.put(tx, location, previous);
}
// record this id in any event, initial send or recovery
@@ -1276,6 +1318,11 @@ public abstract class MessageDatabase ex
byte priority = sd.orderIndex.lastGetPriority();
sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
}
+
+ MessageKeys keys = sd.orderIndex.get(tx, sequence);
+ if (keys != null) {
+ recordAckMessageReferenceLocation(ackLocation, keys.location);
+ }
// The following method handles deleting un-referenced messages.
removeAckLocation(tx, sd, subscriptionKey, sequence);
} else if (LOG.isDebugEnabled()) {
@@ -1286,13 +1333,12 @@ public abstract class MessageDatabase ex
metadata.lastUpdate = ackLocation;
}
- Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
- Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
+ Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
if (referenceFileIds == null) {
referenceFileIds = new HashSet<Integer>();
referenceFileIds.add(messageLocation.getDataFileId());
- ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
+ metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
} else {
Integer id = Integer.valueOf(messageLocation.getDataFileId());
if (!referenceFileIds.contains(id)) {
@@ -1325,6 +1371,10 @@ public abstract class MessageDatabase ex
sd.ackPositions.clear(tx);
sd.ackPositions.unload(tx);
tx.free(sd.ackPositions.getHeadPageId());
+
+ sd.subLocations.clear(tx);
+ sd.subLocations.unload(tx);
+ tx.free(sd.subLocations.getHeadPageId());
}
String key = key(command.getDestination());
@@ -1339,6 +1389,7 @@ public abstract class MessageDatabase ex
// If set then we are creating it.. otherwise we are destroying the sub
if (command.hasSubscriptionInfo()) {
sd.subscriptions.put(tx, subscriptionKey, command);
+ sd.subLocations.put(tx, subscriptionKey, location);
long ackLocation=NOT_ACKED;
if (!command.getRetroactive()) {
ackLocation = sd.orderIndex.nextMessageId-1;
@@ -1350,6 +1401,7 @@ public abstract class MessageDatabase ex
} else {
// delete the sub...
sd.subscriptions.remove(tx, subscriptionKey);
+ sd.subLocations.remove(tx, subscriptionKey);
sd.subscriptionAcks.remove(tx, subscriptionKey);
sd.subscriptionCache.remove(subscriptionKey);
removeAckLocationsForSub(tx, sd, subscriptionKey);
@@ -1394,6 +1446,7 @@ public abstract class MessageDatabase ex
metadata.state = OPEN_STATE;
metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
+ metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
Location[] inProgressTxRange = getInProgressTxLocationRange();
metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
tx.store(metadata.page, metadataMarshaller, true);
@@ -1432,6 +1485,14 @@ public abstract class MessageDatabase ex
}
}
+ if (metadata.ackMessageFileMapLocation != null) {
+ int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
+ gcCandidateSet.remove(dataFileId);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet);
+ }
+ }
+
// Don't GC files referenced by in-progress tx
if (inProgressTxRange[0] != null) {
for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
@@ -1488,6 +1549,45 @@ public abstract class MessageDatabase ex
}
}
});
+
+ // Durable Subscription
+ if (entry.getValue().subLocations != null) {
+ Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
+ while (iter.hasNext()) {
+ Entry<String, Location> subscription = iter.next();
+ int dataFileId = subscription.getValue().getDataFileId();
+
+ // Move subscription along if it has no outstanding messages that need ack'd
+ // and its in the last log file in the journal.
+ if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
+ final StoredDestination destination = entry.getValue();
+ final String subscriptionKey = subscription.getKey();
+ SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
+
+ // When pending is size one that is the next message Id meaning there
+ // are no pending messages currently.
+ if (pendingAcks == null || pendingAcks.size() <= 1) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId);
+ }
+
+ final KahaSubscriptionCommand kahaSub =
+ destination.subscriptions.get(tx, subscriptionKey);
+ destination.subLocations.put(
+ tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
+
+ // Skips the remove from candidates if we rewrote the subscription
+ // in order to prevent duplicate subscription commands on recover.
+ // If another subscription is on the same file and isn't rewritten
+ // than it will remove the file from the set.
+ continue;
+ }
+ }
+
+ gcCandidateSet.remove(dataFileId);
+ }
+ }
+
if (LOG.isTraceEnabled()) {
LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
}
@@ -1501,7 +1601,7 @@ public abstract class MessageDatabase ex
Iterator<Integer> candidates = gcCandidateSet.iterator();
while (candidates.hasNext()) {
Integer candidate = candidates.next();
- Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate);
+ Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
if (referencedFileIds != null) {
for (Integer referencedFileId : referencedFileIds) {
if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) {
@@ -1511,7 +1611,7 @@ public abstract class MessageDatabase ex
}
}
if (gcCandidateSet.contains(candidate)) {
- ackMessageFileMap.remove(candidate);
+ metadata.ackMessageFileMap.remove(candidate);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("not removing data file: " + candidate
@@ -1537,6 +1637,7 @@ public abstract class MessageDatabase ex
public void run() {
}
};
+
private Location checkpointProducerAudit() throws IOException {
if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -1556,6 +1657,35 @@ public abstract class MessageDatabase ex
return metadata.producerSequenceIdTrackerLocation;
}
+ private Location checkpointAckMessageFileMap() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oout = new ObjectOutputStream(baos);
+ oout.writeObject(metadata.ackMessageFileMap);
+ oout.flush();
+ oout.close();
+ // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
+ Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback);
+ try {
+ location.getLatch().await();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.toString());
+ }
+ return location;
+ }
+
+ private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException {
+
+ ByteSequence sequence = toByteSequence(subscription);
+ Location location = journal.write(sequence, nullCompletionCallback) ;
+
+ try {
+ location.getLatch().await();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.toString());
+ }
+ return location;
+ }
+
public HashSet<Integer> getJournalFilesBeingReplicated() {
return journalFilesBeingReplicated;
}
@@ -1566,13 +1696,6 @@ public abstract class MessageDatabase ex
private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
- class StoredSubscription {
- SubscriptionInfo subscriptionInfo;
- String lastAckId;
- Location lastAckLocation;
- Location cursor;
- }
-
static class MessageKeys {
final String messageId;
final Location location;
@@ -1677,6 +1800,7 @@ public abstract class MessageDatabase ex
BTreeIndex<String, LastAck> subscriptionAcks;
HashMap<String, MessageOrderCursor> subscriptionCursors;
ListIndex<String, SequenceSet> ackPositions;
+ ListIndex<String, Location> subLocations;
// Transient data used to track which Messages are no longer needed.
final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
@@ -1730,6 +1854,9 @@ public abstract class MessageDatabase ex
// Now move the pending messages to ack data into the store backed
// structure.
value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
+ value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
+ value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
+ value.ackPositions.load(tx);
for(String subscriptionKey : temp.keySet()) {
value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
}
@@ -1737,26 +1864,41 @@ public abstract class MessageDatabase ex
}
});
}
- }
- if (metadata.version >= 2) {
- value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
- value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
- } else {
+
+ if (metadata.version >= 5) {
+ value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong());
+ } else {
// upgrade
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
- value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
- value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
- value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
- value.orderIndex.lowPriorityIndex.load(tx);
-
- value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
- value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
- value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
- value.orderIndex.highPriorityIndex.load(tx);
+ value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
+ value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
+ value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
+ value.subLocations.load(tx);
}
});
+ }
+ }
+ if (metadata.version >= 2) {
+ value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
+ value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
+ } else {
+ // upgrade
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+ value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+ value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+ value.orderIndex.lowPriorityIndex.load(tx);
+
+ value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+ value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+ value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+ value.orderIndex.highPriorityIndex.load(tx);
+ }
+ });
}
return value;
@@ -1772,6 +1914,7 @@ public abstract class MessageDatabase ex
dataOut.writeLong(value.subscriptions.getPageId());
dataOut.writeLong(value.subscriptionAcks.getPageId());
dataOut.writeLong(value.ackPositions.getHeadPageId());
+ dataOut.writeLong(value.subLocations.getHeadPageId());
} else {
dataOut.writeBoolean(false);
}
@@ -1840,6 +1983,7 @@ public abstract class MessageDatabase ex
rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
+ rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
}
metadata.destinations.put(tx, key, rc);
}
@@ -1873,6 +2017,10 @@ public abstract class MessageDatabase ex
rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
rc.ackPositions.load(tx);
+ rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
+ rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
+ rc.subLocations.load(tx);
+
rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
if (metadata.version < 3) {
Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java?rev=1468171&r1=1468170&r2=1468171&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java Mon Apr 15 18:08:15 2013
@@ -18,6 +18,7 @@ package org.apache.activemq.store.kahadb
import java.io.IOException;
+import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
@@ -53,8 +54,10 @@ public class Visitor {
public void visit(KahaSubscriptionCommand kahaUpdateSubscriptionCommand) throws IOException {
}
-
+
public void visit(KahaProducerAuditCommand kahaProducerAuditCommand) throws IOException {
}
+ public void visit(KahaAckMessageFileMapCommand kahaProducerAuditCommand) throws IOException {
+ }
}
Modified: activemq/trunk/activemq-kahadb-store/src/main/proto/journal-data.proto
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/proto/journal-data.proto?rev=1468171&r1=1468170&r2=1468171&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/proto/journal-data.proto (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/proto/journal-data.proto Mon Apr 15 18:08:15 2013
@@ -30,6 +30,7 @@ enum KahaEntryType {
KAHA_REMOVE_DESTINATION_COMMAND = 6;
KAHA_SUBSCRIPTION_COMMAND = 7;
KAHA_PRODUCER_AUDIT_COMMAND = 8;
+ KAHA_ACK_MESSAGE_FILE_MAP_COMMAND = 9;
}
message KahaTraceCommand {
@@ -40,7 +41,7 @@ message KahaTraceCommand {
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaTraceCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType";
-
+
required string message = 1;
}
@@ -48,7 +49,7 @@ message KahaAddMessageCommand {
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaAddMessageCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType";
-
+
optional KahaTransactionInfo transaction_info=1;
required KahaDestination destination = 2;
required string messageId = 3;
@@ -120,10 +121,22 @@ message KahaProducerAuditCommand {
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaProducerAuditCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType";
-
+
required bytes audit = 1;
}
+message KahaAckMessageFileMapCommand {
+ // We make use of the wonky comment style bellow because the following options
+ // are not valid for protoc, but they are valid for the ActiveMQ proto compiler.
+ // In the ActiveMQ proto compiler, comments terminate with the pipe character: |
+
+ //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaAckMessageFileMapCommand>";
+ //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+ //| option java_type_method = "KahaEntryType";
+
+ required bytes ackMessageFileMap = 1;
+}
+
message KahaDestination {
enum DestinationType {
QUEUE = 0;
@@ -154,8 +167,8 @@ message KahaXATransactionId {
}
message KahaLocation {
- required int32 log_id = 1;
- required int32 offset = 2;
+ required int32 log_id = 1;
+ required int32 offset = 2;
}
// TODO things to ponder
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java?rev=1468171&r1=1468170&r2=1468171&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java Mon Apr 15 18:08:15 2013
@@ -16,14 +16,13 @@
*/
package org.apache.activemq.bugs;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -31,51 +30,95 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.Topic;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AMQ2832Test {
private static final Logger LOG = LoggerFactory.getLogger(AMQ2832Test.class);
BrokerService broker = null;
+ private ActiveMQConnectionFactory cf;
private final Destination destination = new ActiveMQQueue("AMQ2832Test");
+ private String connectionUri;
+
+ protected void startBroker() throws Exception {
+ doStartBroker(true, false);
+ }
+
+ protected void restartBroker() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ doStartBroker(false, false);
+ }
- protected void startBroker(boolean delete) throws Exception {
+ protected void recoverBroker() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ doStartBroker(false, true);
+ }
+
+ private void doStartBroker(boolean delete, boolean recover) throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(delete);
broker.setPersistent(true);
- broker.setUseJmx(false);
+ broker.setUseJmx(true);
broker.addConnector("tcp://localhost:0");
- configurePersistence(broker, delete);
+ configurePersistence(broker, recover);
+
+ connectionUri = "vm://localhost?create=false";
+ cf = new ActiveMQConnectionFactory(connectionUri);
broker.start();
LOG.info("Starting broker..");
}
- protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
+ protected void configurePersistence(BrokerService brokerService, boolean recover) throws Exception {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
// ensure there are a bunch of data files but multiple entries in each
adapter.setJournalMaxFileLength(1024 * 20);
// speed up the test case, checkpoint an cleanup early and often
- adapter.setCheckpointInterval(500);
- adapter.setCleanupInterval(500);
+ adapter.setCheckpointInterval(5000);
+ adapter.setCleanupInterval(5000);
- if (!deleteAllOnStart) {
+ if (recover) {
adapter.setForceRecoverIndex(true);
}
+ }
+ @After
+ public void tearDown() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
}
@Test
public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
- startBroker(true);
+ startBroker();
StagedConsumer consumer = new StagedConsumer();
int numMessagesAvailable = produceMessagesToConsumeMultipleDataFiles(20);
@@ -102,9 +145,9 @@ public class AMQ2832Test {
broker.stop();
broker.waitUntilStopped();
- startBroker(false);
+ recoverBroker();
- consumer = new StagedConsumer();
+ consumer = new StagedConsumer();
// need to force recovery?
Message msg = consumer.receive(1, 5);
@@ -115,10 +158,99 @@ public class AMQ2832Test {
msg = consumer.receive(1, 5);
assertEquals("Only one messages left after recovery: " + msg, null, msg);
consumer.close();
+ }
+ @Test
+ public void testAlternateLossScenario() throws Exception {
+
+ startBroker();
+
+ ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
+ ActiveMQQueue disposable = new ActiveMQQueue("MyDisposableQueue");
+ ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
+
+ // This ensure that data file 1 never goes away.
+ createInactiveDurableSub(topic);
+ assertEquals(1, getNumberOfJournalFiles());
+
+ // One Queue Message that will be acked in another data file.
+ produceMessages(queue, 1);
+ assertEquals(1, getNumberOfJournalFiles());
+
+ // Add some messages to consume space
+ produceMessages(disposable, 50);
+
+ int dataFilesCount = getNumberOfJournalFiles();
+ assertTrue(dataFilesCount > 1);
+
+ // Create an ack for the single message on this queue
+ drainQueue(queue);
+
+ // Add some more messages to consume space beyond tha data file with the ack
+ produceMessages(disposable, 50);
+
+ assertTrue(dataFilesCount < getNumberOfJournalFiles());
+ dataFilesCount = getNumberOfJournalFiles();
+
+ restartBroker();
+
+ // Clear out all queue data
+ broker.getAdminView().removeQueue(disposable.getQueueName());
+
+ // Once this becomes true our ack could be lost.
+ assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getNumberOfJournalFiles() <= 3;
+ }
+ }, TimeUnit.MINUTES.toMillis(3)));
+
+ // Recover and the Message should not be replayed but if the old MessageAck is lost
+ // then it could be.
+ recoverBroker();
+
+ assertTrue(drainQueue(queue) == 0);
}
- private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
+ private int getNumberOfJournalFiles() throws IOException {
+ Collection<DataFile> files =
+ ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+ int reality = 0;
+ for (DataFile file : files) {
+ if (file != null) {
+ reality++;
+ }
+ }
+
+ return reality;
+ }
+
+ private void createInactiveDurableSub(Topic topic) throws Exception {
+ Connection connection = cf.createConnection();
+ connection.setClientID("Inactive");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
+ consumer.close();
+ connection.close();
+ produceMessages(topic, 1);
+ }
+
+ private int drainQueue(Queue queue) throws Exception {
+ Connection connection = cf.createConnection();
+ connection.setClientID("Inactive");
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+ int count = 0;
+ while (consumer.receive(5000) != null) {
+ count++;
+ }
+ consumer.close();
+ connection.close();
+ return count;
+ }
+
+ private int produceMessages(Destination destination, int numToSend) throws Exception {
int sent = 0;
Connection connection = new ActiveMQConnectionFactory(
broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
@@ -133,10 +265,14 @@ public class AMQ2832Test {
} finally {
connection.close();
}
-
+
return sent;
}
+ private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
+ return produceMessages(destination, numToSend);
+ }
+
final String payload = new String(new byte[1024]);
private Message createMessage(Session session, int i) throws Exception {
Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java?rev=1468171&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java Mon Apr 15 18:08:15 2013
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4212Test {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AMQ4212Test.class);
+
+ private BrokerService service;
+ private String connectionUri;
+ private ActiveMQConnectionFactory cf;
+
+ private final int MSG_COUNT = 256;
+
+ @Before
+ public void setUp() throws IOException, Exception {
+ createBroker(true, false);
+ }
+
+ public void createBroker(boolean deleteAllMessages, boolean recover) throws Exception {
+ service = new BrokerService();
+ service.setBrokerName("InactiveSubTest");
+ service.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ service.setAdvisorySupport(false);
+ service.setPersistent(true);
+ service.setUseJmx(true);
+ service.setKeepDurableSubsActive(false);
+
+ KahaDBPersistenceAdapter pa=new KahaDBPersistenceAdapter();
+ File dataFile=new File("KahaDB");
+ pa.setDirectory(dataFile);
+ pa.setJournalMaxFileLength(10*1024);
+ pa.setCheckpointInterval(TimeUnit.SECONDS.toMillis(5));
+ pa.setCleanupInterval(TimeUnit.SECONDS.toMillis(5));
+ pa.setForceRecoverIndex(recover);
+
+ service.setPersistenceAdapter(pa);
+ service.start();
+ service.waitUntilStarted();
+
+ connectionUri = "vm://InactiveSubTest?create=false";
+ cf = new ActiveMQConnectionFactory(connectionUri);
+ }
+
+ private void restartBroker() throws Exception {
+ stopBroker();
+ createBroker(false, false);
+ }
+
+ private void recoverBroker() throws Exception {
+ stopBroker();
+ createBroker(false, true);
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ if (service != null) {
+ service.stop();
+ service.waitUntilStopped();
+ service = null;
+ }
+ }
+
+ @Test
+ public void testDirableSubPrefetchRecovered() throws Exception {
+
+ ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
+ ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
+
+ // Send to a Queue to create some journal files
+ sendMessages(queue);
+
+ LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+
+ createInactiveDurableSub(topic);
+
+ assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+ return subs != null && subs.length == 1 ? true : false;
+ }
+ }));
+
+ // Now send some more to the queue to create even more files.
+ sendMessages(queue);
+
+ LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+ assertTrue(getNumberOfJournalFiles() > 1);
+
+ LOG.info("Restarting the broker.");
+ restartBroker();
+ LOG.info("Restarted the broker.");
+
+ LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+ assertTrue(getNumberOfJournalFiles() > 1);
+
+ assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+ return subs != null && subs.length == 1 ? true : false;
+ }
+ }));
+
+ // Clear out all queue data
+ service.getAdminView().removeQueue(queue.getQueueName());
+
+ assertTrue("Less than two journal files expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getNumberOfJournalFiles() <= 2;
+ }
+ }, TimeUnit.MINUTES.toMillis(2)));
+
+ LOG.info("Sending {} Messages to the Topic.", MSG_COUNT);
+ // Send some messages to the inactive destination
+ sendMessages(topic);
+
+ LOG.info("Attempt to consume {} messages from the Topic.", MSG_COUNT);
+ assertEquals(MSG_COUNT, consumeFromInactiveDurableSub(topic));
+
+ LOG.info("Recovering the broker.");
+ recoverBroker();
+ LOG.info("Recovering the broker.");
+
+ assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+ return subs != null && subs.length == 1 ? true : false;
+ }
+ }));
+ }
+
+ @Test
+ public void testDurableAcksNotDropped() throws Exception {
+
+ ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
+ ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
+
+ // Create durable sub in first data file.
+ createInactiveDurableSub(topic);
+
+ assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+ return subs != null && subs.length == 1 ? true : false;
+ }
+ }));
+
+ // Send to a Topic
+ sendMessages(topic, 1);
+
+ // Send to a Queue to create some journal files
+ sendMessages(queue);
+
+ LOG.info("Before consume there are currently [{}] journal log files.", getNumberOfJournalFiles());
+
+ // Consume all the Messages leaving acks behind.
+ consumeDurableMessages(topic, 1);
+
+ LOG.info("After consume there are currently [{}] journal log files.", getNumberOfJournalFiles());
+
+ // Now send some more to the queue to create even more files.
+ sendMessages(queue);
+
+ LOG.info("More Queued. There are currently [{}] journal log files.", getNumberOfJournalFiles());
+ assertTrue(getNumberOfJournalFiles() > 1);
+
+ LOG.info("Restarting the broker.");
+ restartBroker();
+ LOG.info("Restarted the broker.");
+
+ LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+ assertTrue(getNumberOfJournalFiles() > 1);
+
+ assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+ return subs != null && subs.length == 1 ? true : false;
+ }
+ }));
+
+ // Clear out all queue data
+ service.getAdminView().removeQueue(queue.getQueueName());
+
+ assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getNumberOfJournalFiles() <= 3;
+ }
+ }, TimeUnit.MINUTES.toMillis(3)));
+
+ // See if we receive any message they should all be acked.
+ tryConsumeExpectNone(topic);
+
+ LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+
+ LOG.info("Recovering the broker.");
+ recoverBroker();
+ LOG.info("Recovering the broker.");
+
+ LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+
+ assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+ return subs != null && subs.length == 1 ? true : false;
+ }
+ }));
+
+ // See if we receive any message they should all be acked.
+ tryConsumeExpectNone(topic);
+
+ assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getNumberOfJournalFiles() == 1;
+ }
+ }, TimeUnit.MINUTES.toMillis(1)));
+ }
+
+ private int getNumberOfJournalFiles() throws IOException {
+ Collection<DataFile> files =
+ ((KahaDBPersistenceAdapter) service.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+ int reality = 0;
+ for (DataFile file : files) {
+ if (file != null) {
+ reality++;
+ }
+ }
+
+ return reality;
+ }
+
+ private void createInactiveDurableSub(Topic topic) throws Exception {
+ Connection connection = cf.createConnection();
+ connection.setClientID("Inactive");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
+ consumer.close();
+ connection.close();
+ }
+
+ private void consumeDurableMessages(Topic topic, int count) throws Exception {
+ Connection connection = cf.createConnection();
+ connection.setClientID("Inactive");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
+ connection.start();
+ for (int i = 0; i < count; ++i) {
+ if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) == null) {
+ fail("should have received a message");
+ }
+ }
+ consumer.close();
+ connection.close();
+ }
+
+ private void tryConsumeExpectNone(Topic topic) throws Exception {
+ Connection connection = cf.createConnection();
+ connection.setClientID("Inactive");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
+ connection.start();
+ if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) != null) {
+ fail("Should be no messages for this durable.");
+ }
+ consumer.close();
+ connection.close();
+ }
+
+ private int consumeFromInactiveDurableSub(Topic topic) throws Exception {
+ Connection connection = cf.createConnection();
+ connection.setClientID("Inactive");
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
+
+ int count = 0;
+
+ while (consumer.receive(10000) != null) {
+ count++;
+ }
+
+ consumer.close();
+ connection.close();
+
+ return count;
+ }
+
+ private void sendMessages(Destination destination) throws Exception {
+ sendMessages(destination, MSG_COUNT);
+ }
+
+ private void sendMessages(Destination destination, int count) throws Exception {
+ Connection connection = cf.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ for (int i = 0; i < count; ++i) {
+ TextMessage message = session.createTextMessage("Message #" + i + " for destination: " + destination);
+ producer.send(message);
+ }
+ connection.close();
+ }
+
+}
Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java?rev=1468171&r1=1468170&r2=1468171&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java Mon Apr 15 18:08:15 2013
@@ -16,17 +16,26 @@
*/
package org.apache.activemq.store.kahadb;
+import java.io.File;
+import java.io.IOException;
+import java.security.ProtectionDomain;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
import junit.framework.TestCase;
-import org.apache.activemq.broker.BrokerService;
+
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
-
-import javax.jms.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.security.ProtectionDomain;
/**
* @author chirino
@@ -43,36 +52,36 @@ public class KahaDBVersionTest extends T
}
static final Logger LOG = LoggerFactory.getLogger(KahaDBVersionTest.class);
- final static File VERSION_1_DB= new File(basedir+"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1");
- final static File VERSION_2_DB= new File(basedir+"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2");
- final static File VERSION_3_DB= new File(basedir+"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3");
+ final static File VERSION_1_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1");
+ final static File VERSION_2_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2");
+ final static File VERSION_3_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3");
+ final static File VERSION_4_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4");
BrokerService broker = null;
protected BrokerService createBroker(KahaDBPersistenceAdapter kaha) throws Exception {
-
broker = new BrokerService();
broker.setUseJmx(false);
broker.setPersistenceAdapter(kaha);
broker.start();
return broker;
-
}
+ @Override
protected void tearDown() throws Exception {
if (broker != null) {
broker.stop();
}
}
-
+
public void XtestCreateStore() throws Exception {
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
- File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersionX");
+ File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4");
IOHelper.deleteFile(dir);
kaha.setDirectory(dir);
- kaha.setJournalMaxFileLength(1024*1024);
+ kaha.setJournalMaxFileLength(1024 * 1024);
BrokerService broker = createBroker(kaha);
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = cf.createConnection();
connection.setClientID("test");
connection.start();
@@ -85,33 +94,37 @@ public class KahaDBVersionTest extends T
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test.topic");
Queue queue = session.createQueue("test.queue");
- MessageConsumer consumer = session.createDurableSubscriber(topic,"test");
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "test");
consumer.close();
MessageProducer producer = session.createProducer(topic);
producer.setPriority(9);
- for (int i =0; i < numToSend; i++) {
- Message msg = session.createTextMessage("test message:"+i);
+ for (int i = 0; i < numToSend; i++) {
+ Message msg = session.createTextMessage("test message:" + i);
producer.send(msg);
}
- LOG.info("sent " + numToSend +" to topic");
+ LOG.info("sent " + numToSend + " to topic");
producer = session.createProducer(queue);
- for (int i =0; i < numToSend; i++) {
- Message msg = session.createTextMessage("test message:"+i);
+ for (int i = 0; i < numToSend; i++) {
+ Message msg = session.createTextMessage("test message:" + i);
producer.send(msg);
}
- LOG.info("sent " + numToSend +" to queue");
+ LOG.info("sent " + numToSend + " to queue");
+ }
+
+ public void testVersion1Conversion() throws Exception {
+ doConvertRestartCycle(VERSION_1_DB);
}
- public void testVersion1Conversion() throws Exception{
- doConvertRestartCycle(VERSION_1_DB);
+ public void testVersion2Conversion() throws Exception {
+ doConvertRestartCycle(VERSION_2_DB);
}
- public void testVersion2Conversion() throws Exception{
- doConvertRestartCycle(VERSION_2_DB);
+ public void testVersion3Conversion() throws Exception {
+ doConvertRestartCycle(VERSION_3_DB);
}
- public void testVersion3Conversion() throws Exception{
- doConvertRestartCycle(VERSION_3_DB);
+ public void testVersion4Conversion() throws Exception {
+ doConvertRestartCycle(VERSION_4_DB);
}
public void doConvertRestartCycle(File existingStore) throws Exception {
@@ -145,7 +158,7 @@ public class KahaDBVersionTest extends T
for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) {
TextMessage msg = (TextMessage) queueConsumer.receive(10000);
count++;
- //System.err.println(msg.getText());
+ // System.err.println(msg.getText());
assertNotNull(msg);
}
LOG.info("Consumed " + count + " from queue");
@@ -154,12 +167,12 @@ public class KahaDBVersionTest extends T
for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) {
TextMessage msg = (TextMessage) topicConsumer.receive(10000);
count++;
- //System.err.println(msg.getText());
+ // System.err.println(msg.getText());
assertNotNull(msg);
}
LOG.info("Consumed " + count + " from topic");
connection.close();
-
+
broker.stop();
}
}
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java?rev=1468171&r1=1468170&r2=1468171&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java Mon Apr 15 18:08:15 2013
@@ -20,13 +20,16 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
+
import junit.framework.Test;
+
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
@@ -44,8 +47,9 @@ public class DurableSubsOfflineSelectorC
public int messageCount = 10000;
private BrokerService broker;
private ActiveMQTopic topic;
- private List<Throwable> exceptions = new ArrayList<Throwable>();
+ private final List<Throwable> exceptions = new ArrayList<Throwable>();
+ @Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
connectionFactory.setWatchTopicAdvisories(false);
@@ -68,6 +72,7 @@ public class DurableSubsOfflineSelectorC
return suite(DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.class);
}
+ @Override
protected void setUp() throws Exception {
exceptions.clear();
topic = (ActiveMQTopic) createDestination();
@@ -75,6 +80,7 @@ public class DurableSubsOfflineSelectorC
super.setUp();
}
+ @Override
protected void tearDown() throws Exception {
super.tearDown();
destroyBroker();
@@ -128,6 +134,7 @@ public class DurableSubsOfflineSelectorC
final CountDownLatch goOn = new CountDownLatch(1);
Thread sendThread = new Thread() {
+ @Override
public void run() {
try {
@@ -208,10 +215,10 @@ public class DurableSubsOfflineSelectorC
LOG.info("Store free page count: " + store.getPageFile().getFreePageCount());
LOG.info("Store page in-use: " + (store.getPageFile().getPageCount() - store.getPageFile().getFreePageCount()));
- assertTrue("no leak of pages, always use just 10", Wait.waitFor(new Wait.Condition() {
+ assertTrue("no leak of pages, always use just 11", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
- return 10 == store.getPageFile().getPageCount() -
+ return 11 == store.getPageFile().getPageCount() -
store.getPageFile().getFreePageCount();
}
}, TimeUnit.SECONDS.toMillis(10)));
@@ -236,6 +243,7 @@ public class DurableSubsOfflineSelectorC
Listener() {
}
+ @Override
public void onMessage(Message message) {
count++;
if (id != null) {