You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/06/05 23:13:35 UTC
svn commit: r1346594 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/store/jdbc/
main/java/org/apache/activemq/store/kahadb/
test/java/org/apache/activemq/broker/
Author: gtully
Date: Tue Jun 5 21:13:34 2012
New Revision: 1346594
URL: http://svn.apache.org/viewvc?rev=1346594&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3305 and https://issues.apache.org/jira/browse/AMQ-3872 - fix up durable subs with xa recovery for kahadb and tidy up test assumptions re redelivery for jdbc. Shared tests pass with both stores now
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1346594&r1=1346593&r2=1346594&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Tue Jun 5 21:13:34 2012
@@ -29,6 +29,7 @@ import org.apache.activemq.broker.region
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
@@ -756,4 +757,22 @@ public abstract class BaseDestination im
answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
return answer;
}
+
+ protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) {
+ // the original ack may be a ranged ack, but we are trying to delete
+ // a specific
+ // message store here so we need to convert to a non ranged ack.
+ if (ack.getMessageCount() > 0) {
+ // Dup the ack
+ MessageAck a = new MessageAck();
+ ack.copy(a);
+ ack = a;
+ // Convert to non-ranged.
+ ack.setFirstMessageId(node.getMessageId());
+ ack.setLastMessageId(node.getMessageId());
+ ack.setMessageCount(1);
+ }
+ return ack;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1346594&r1=1346593&r2=1346594&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Jun 5 21:13:34 2012
@@ -833,21 +833,7 @@ public class Queue extends BaseDestinati
throws IOException {
messageConsumed(context, node);
if (store != null && node.isPersistent()) {
- // the original ack may be a ranged ack, but we are trying to delete
- // a specific
- // message store here so we need to convert to a non ranged ack.
- if (ack.getMessageCount() > 0) {
- // Dup the ack
- MessageAck a = new MessageAck();
- ack.copy(a);
- ack = a;
- // Convert to non-ranged.
- ack.setFirstMessageId(node.getMessageId());
- ack.setLastMessageId(node.getMessageId());
- ack.setMessageCount(1);
- }
-
- store.removeAsyncMessage(context, ack);
+ store.removeAsyncMessage(context, convertToNonRangedAck(ack, node));
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1346594&r1=1346593&r2=1346594&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Tue Jun 5 21:13:34 2012
@@ -518,7 +518,8 @@ public class Topic extends BaseDestinati
if (topicStore != null && node.isPersistent()) {
DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
SubscriptionKey key = dsub.getSubscriptionKey();
- topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), ack);
+ topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
+ convertToNonRangedAck(ack, node));
}
messageConsumed(context, node);
}
@@ -763,7 +764,7 @@ public class Topic extends BaseDestinati
}
- public void clearPendingMessages(SubscriptionKey subscriptionKey) {
+ private void clearPendingMessages(SubscriptionKey subscriptionKey) {
dispatchLock.readLock().lock();
try {
DurableTopicSubscription durableTopicSubscription = durableSubcribers.get(subscriptionKey);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java?rev=1346594&r1=1346593&r2=1346594&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java Tue Jun 5 21:13:34 2012
@@ -337,12 +337,6 @@ public class JdbcMemoryTransactionStore
ack,
subscriptionName, clientId);
jdbcTopicMessageStore.complete(clientId, subscriptionName);
-
- Map<ActiveMQDestination, Destination> destinations = ((JDBCPersistenceAdapter) persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap();
- Topic topic = (Topic) destinations.get(topicMessageStore.getDestination());
- SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
- topic.getDurableTopicSubs().get(key).getPending().rollback(ack.getLastMessageId());
- topic.clearPendingMessages(key);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1346594&r1=1346593&r2=1346594&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Tue Jun 5 21:13:34 2012
@@ -711,6 +711,9 @@ public class KahaDBStore extends Message
command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
if (ack != null && ack.isUnmatchedAck()) {
command.setAck(UNMATCHED);
+ } else {
+ org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
+ command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
}
store(command, false, null, null);
}
@@ -819,6 +822,9 @@ public class KahaDBStore extends Message
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
+ if (ackedAndPrepared.contains(entry.getValue().messageId)) {
+ continue;
+ }
listener.recoverMessage(loadMessage(entry.getValue().location));
}
sd.orderIndex.resetCursorPosition();
@@ -858,6 +864,9 @@ public class KahaDBStore extends Message
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
.hasNext();) {
entry = iterator.next();
+ if (ackedAndPrepared.contains(entry.getValue().messageId)) {
+ continue;
+ }
if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
counter++;
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=1346594&r1=1346593&r2=1346594&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java Tue Jun 5 21:13:34 2012
@@ -559,11 +559,6 @@ public class XARecoveryBrokerTest extend
}
public void testTopicPersistentPreparedAcksNotLostOnRestart() throws Exception {
- // REVISIT for kahadb
- if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
- LOG.warn("only works on jdbc");
- return;
- }
ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
// Setup the producer and send the message.
@@ -581,7 +576,8 @@ public class XARecoveryBrokerTest extend
consumerInfo.setSubscriptionName("durable");
connection.send(consumerInfo);
- for (int i = 0; i < 4; i++) {
+ final int numMessages = 4;
+ for (int i = 0; i < numMessages; i++) {
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
connection.send(message);
@@ -591,7 +587,7 @@ public class XARecoveryBrokerTest extend
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
- final int messageCount = expectedMessageCount(4, destination);
+ final int messageCount = expectedMessageCount(numMessages, destination);
Message m = null;
for (int i = 0; i < messageCount; i++) {
m = receiveMessage(connection);
@@ -741,12 +737,6 @@ public class XARecoveryBrokerTest extend
public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
- // REVISIT for kahadb
- if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
- LOG.warn("only works on jdbc");
- return;
- }
-
ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
// Setup the producer and send the message.
@@ -843,12 +833,6 @@ public class XARecoveryBrokerTest extend
public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception {
- // REVISIT for kahadb
- if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
- LOG.warn("only works on jdbc");
- return;
- }
-
ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
// Setup the producer and send the message.
@@ -893,7 +877,20 @@ public class XARecoveryBrokerTest extend
// rollback so we get redelivery
connection.request(createRollbackTransaction(connectionInfo, txid));
- LOG.info("new tx for redelivery");
+ LOG.info("new consumer/tx for redelivery");
+ connection.request(closeConnectionInfo(connectionInfo));
+
+ connectionInfo = createConnectionInfo();
+ connectionInfo.setClientId("durable");
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+
+ // setup durable subs
+ consumerInfo = createConsumerInfo(sessionInfo, destination);
+ consumerInfo.setSubscriptionName("durable");
+ connection.send(consumerInfo);
+
txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));