You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/06/05 23:13:35 UTC

svn commit: r1346594 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/broker/

Author: gtully
Date: Tue Jun  5 21:13:34 2012
New Revision: 1346594

URL: http://svn.apache.org/viewvc?rev=1346594&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3305 and https://issues.apache.org/jira/browse/AMQ-3872 - fix up durable subs with xa recovery for kahadb and tidy up test assumptions re redelivery for jdbc. Shared tests pass with both stores now

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1346594&r1=1346593&r2=1346594&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Tue Jun  5 21:13:34 2012
@@ -29,6 +29,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
@@ -756,4 +757,22 @@ public abstract class BaseDestination im
         answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
         return answer;
     }
+
+    protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) {
+        // the original ack may be a ranged ack, but we are trying to delete
+        // a specific
+        // message store here so we need to convert to a non ranged ack.
+        if (ack.getMessageCount() > 0) {
+            // Dup the ack
+            MessageAck a = new MessageAck();
+            ack.copy(a);
+            ack = a;
+            // Convert to non-ranged.
+            ack.setFirstMessageId(node.getMessageId());
+            ack.setLastMessageId(node.getMessageId());
+            ack.setMessageCount(1);
+        }
+        return ack;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1346594&r1=1346593&r2=1346594&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Jun  5 21:13:34 2012
@@ -833,21 +833,7 @@ public class Queue extends BaseDestinati
             throws IOException {
         messageConsumed(context, node);
         if (store != null && node.isPersistent()) {
-            // the original ack may be a ranged ack, but we are trying to delete
-            // a specific
-            // message store here so we need to convert to a non ranged ack.
-            if (ack.getMessageCount() > 0) {
-                // Dup the ack
-                MessageAck a = new MessageAck();
-                ack.copy(a);
-                ack = a;
-                // Convert to non-ranged.
-                ack.setFirstMessageId(node.getMessageId());
-                ack.setLastMessageId(node.getMessageId());
-                ack.setMessageCount(1);
-            }
-
-            store.removeAsyncMessage(context, ack);
+            store.removeAsyncMessage(context, convertToNonRangedAck(ack, node));
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1346594&r1=1346593&r2=1346594&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Tue Jun  5 21:13:34 2012
@@ -518,7 +518,8 @@ public class Topic extends BaseDestinati
         if (topicStore != null && node.isPersistent()) {
             DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
             SubscriptionKey key = dsub.getSubscriptionKey();
-            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), ack);
+            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
+                    convertToNonRangedAck(ack, node));
         }
         messageConsumed(context, node);
     }
@@ -763,7 +764,7 @@ public class Topic extends BaseDestinati
     }
 
 
-    public void clearPendingMessages(SubscriptionKey subscriptionKey) {
+    private void clearPendingMessages(SubscriptionKey subscriptionKey) {
         dispatchLock.readLock().lock();
         try {
             DurableTopicSubscription durableTopicSubscription = durableSubcribers.get(subscriptionKey);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java?rev=1346594&r1=1346593&r2=1346594&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java Tue Jun  5 21:13:34 2012
@@ -337,12 +337,6 @@ public class JdbcMemoryTransactionStore 
                             ack,
                             subscriptionName, clientId);
                     jdbcTopicMessageStore.complete(clientId, subscriptionName);
-
-                    Map<ActiveMQDestination, Destination> destinations = ((JDBCPersistenceAdapter) persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap();
-                    Topic topic = (Topic) destinations.get(topicMessageStore.getDestination());
-                    SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
-                    topic.getDurableTopicSubs().get(key).getPending().rollback(ack.getLastMessageId());
-                    topic.clearPendingMessages(key);
                 }
 
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1346594&r1=1346593&r2=1346594&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Tue Jun  5 21:13:34 2012
@@ -711,6 +711,9 @@ public class KahaDBStore extends Message
             command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
             if (ack != null && ack.isUnmatchedAck()) {
                 command.setAck(UNMATCHED);
+            } else {
+                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
+                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
             }
             store(command, false, null, null);
         }
@@ -819,6 +822,9 @@ public class KahaDBStore extends Message
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
                                 .hasNext();) {
                             Entry<Long, MessageKeys> entry = iterator.next();
+                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
+                                continue;
+                            }
                             listener.recoverMessage(loadMessage(entry.getValue().location));
                         }
                         sd.orderIndex.resetCursorPosition();
@@ -858,6 +864,9 @@ public class KahaDBStore extends Message
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
                                 .hasNext();) {
                             entry = iterator.next();
+                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
+                                continue;
+                            }
                             if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
                                 counter++;
                             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=1346594&r1=1346593&r2=1346594&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java Tue Jun  5 21:13:34 2012
@@ -559,11 +559,6 @@ public class XARecoveryBrokerTest extend
     }
 
     public void testTopicPersistentPreparedAcksNotLostOnRestart() throws Exception {
-        // REVISIT for kahadb
-        if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
-            LOG.warn("only works on jdbc");
-            return;
-        }
         ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
 
         // Setup the producer and send the message.
@@ -581,7 +576,8 @@ public class XARecoveryBrokerTest extend
         consumerInfo.setSubscriptionName("durable");
         connection.send(consumerInfo);
 
-        for (int i = 0; i < 4; i++) {
+        final int numMessages = 4;
+        for (int i = 0; i < numMessages; i++) {
             Message message = createMessage(producerInfo, destination);
             message.setPersistent(true);
             connection.send(message);
@@ -591,7 +587,7 @@ public class XARecoveryBrokerTest extend
         XATransactionId txid = createXATransaction(sessionInfo);
         connection.send(createBeginTransaction(connectionInfo, txid));
 
-        final int messageCount = expectedMessageCount(4, destination);
+        final int messageCount = expectedMessageCount(numMessages, destination);
         Message m = null;
         for (int i = 0; i < messageCount; i++) {
             m = receiveMessage(connection);
@@ -741,12 +737,6 @@ public class XARecoveryBrokerTest extend
 
     public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
 
-        // REVISIT for kahadb
-        if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
-            LOG.warn("only works on jdbc");
-            return;
-        }
-
         ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
 
         // Setup the producer and send the message.
@@ -843,12 +833,6 @@ public class XARecoveryBrokerTest extend
 
     public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception {
 
-        // REVISIT for kahadb
-        if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
-            LOG.warn("only works on jdbc");
-            return;
-        }
-
         ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
 
         // Setup the producer and send the message.
@@ -893,7 +877,20 @@ public class XARecoveryBrokerTest extend
         // rollback so we get redelivery
         connection.request(createRollbackTransaction(connectionInfo, txid));
 
-        LOG.info("new tx for redelivery");
+        LOG.info("new consumer/tx for redelivery");
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+
+        // setup durable subs
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
         txid = createXATransaction(sessionInfo);
         connection.send(createBeginTransaction(connectionInfo, txid));