You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/12/02 18:15:14 UTC

svn commit: r886208 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/store/kahadaptor/ test/java/org/apache/activemq/broker/

Author: dejanb
Date: Wed Dec  2 17:15:12 2009
New Revision: 886208

URL: http://svn.apache.org/viewvc?rev=886208&view=rev
Log:
merging 885488 - http://issues.apache.org/activemq/browse/AMQ-1498 - broker recovery missing subscriber

Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=886208&r1=886207&r2=886208&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Wed Dec  2 17:15:12 2009
@@ -359,10 +359,14 @@
         if (sub == null) {
             sub = subscriptions.get(ack.getConsumerId());        
             if (sub == null) {
-                LOG.warn("Ack for non existent subscription, ack:" + ack); 
-                throw new IllegalArgumentException(
+                if (!consumerExchange.getConnectionContext().isInRecoveryMode()) {
+                    LOG.warn("Ack for non existent subscription, ack:" + ack); 
+                    throw new IllegalArgumentException(
                         "The subscription does not exist: "
                         + ack.getConsumerId());
+                } else {
+                    return;
+                }
             }
             consumerExchange.setSubscription(sub);
         }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java?rev=886208&r1=886207&r2=886208&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java Wed Dec  2 17:15:12 2009
@@ -19,6 +19,7 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.impl.async.Location;
@@ -55,8 +56,15 @@
     public void writePayload(AMQTx amqtx, DataOutput dataOut) throws IOException {
         amqtx.getLocation().writeExternal(dataOut);
         List<AMQTxOperation> list = amqtx.getOperations();
-        dataOut.writeInt(list.size());
+        List<AMQTxOperation> ops = new ArrayList<AMQTxOperation>();
+        
         for (AMQTxOperation op : list) {
+            if (op.getOperationType() == op.ADD_OPERATION_TYPE) {
+                ops.add(op);
+            }
+        }
+        dataOut.writeInt(ops.size());
+        for (AMQTxOperation op : ops) {
             op.writeExternal(wireFormat, dataOut);
         }
     }

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=886208&r1=886207&r2=886208&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java Wed Dec  2 17:15:12 2009
@@ -208,6 +208,63 @@
         Message m = receiveMessage(connection);
         assertNull(m);
     }
+    
+    public void testQueuePersistentPreparedAcksNotLostOnRestart() throws Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Setup the consumer and receive the message.
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+        for (int i = 0; i < 4; i++) {
+            Message m = receiveMessage(connection);
+            assertNotNull(m);
+            MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection.send(ack);
+        }
+        
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+        
+        // All messages should be re-delivered.
+        for (int i = 0; i < 4; i++) {
+            Message m = receiveMessage(connection);
+            assertNotNull(m);
+        }
+
+        assertNoMessagesLeft(connection);
+    }
 
     public void testQueuePersistentUncommittedAcksLostOnRestart() throws Exception {