You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/12/02 18:15:14 UTC
svn commit: r886208 - in /activemq/branches/activemq-5.3/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/store/kahadaptor/
test/java/org/apache/activemq/broker/
Author: dejanb
Date: Wed Dec 2 17:15:12 2009
New Revision: 886208
URL: http://svn.apache.org/viewvc?rev=886208&view=rev
Log:
merging 885488 - http://issues.apache.org/activemq/browse/AMQ-1498 - broker recovery missing subscriber
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=886208&r1=886207&r2=886208&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Wed Dec 2 17:15:12 2009
@@ -359,10 +359,14 @@
if (sub == null) {
sub = subscriptions.get(ack.getConsumerId());
if (sub == null) {
- LOG.warn("Ack for non existent subscription, ack:" + ack);
- throw new IllegalArgumentException(
+ if (!consumerExchange.getConnectionContext().isInRecoveryMode()) {
+ LOG.warn("Ack for non existent subscription, ack:" + ack);
+ throw new IllegalArgumentException(
"The subscription does not exist: "
+ ack.getConsumerId());
+ } else {
+ return;
+ }
}
consumerExchange.setSubscription(sub);
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java?rev=886208&r1=886207&r2=886208&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java Wed Dec 2 17:15:12 2009
@@ -19,6 +19,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.impl.async.Location;
@@ -55,8 +56,15 @@
public void writePayload(AMQTx amqtx, DataOutput dataOut) throws IOException {
amqtx.getLocation().writeExternal(dataOut);
List<AMQTxOperation> list = amqtx.getOperations();
- dataOut.writeInt(list.size());
+ List<AMQTxOperation> ops = new ArrayList<AMQTxOperation>();
+
for (AMQTxOperation op : list) {
+ if (op.getOperationType() == op.ADD_OPERATION_TYPE) {
+ ops.add(op);
+ }
+ }
+ dataOut.writeInt(ops.size());
+ for (AMQTxOperation op : ops) {
op.writeExternal(wireFormat, dataOut);
}
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=886208&r1=886207&r2=886208&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java Wed Dec 2 17:15:12 2009
@@ -208,6 +208,63 @@
Message m = receiveMessage(connection);
assertNull(m);
}
+
+ public void testQueuePersistentPreparedAcksNotLostOnRestart() throws Exception {
+
+ ActiveMQDestination destination = createDestination();
+
+ // Setup the producer and send the message.
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ for (int i = 0; i < 4; i++) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ connection.send(message);
+ }
+
+ // Setup the consumer and receive the message.
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+
+ // Begin the transaction.
+ XATransactionId txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+ for (int i = 0; i < 4; i++) {
+ Message m = receiveMessage(connection);
+ assertNotNull(m);
+ MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection.send(ack);
+ }
+
+ connection.request(createPrepareTransaction(connectionInfo, txid));
+
+ // restart the broker.
+ restartBroker();
+
+ // Setup the consumer and receive the message.
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+
+ // All messages should be re-delivered.
+ for (int i = 0; i < 4; i++) {
+ Message m = receiveMessage(connection);
+ assertNotNull(m);
+ }
+
+ assertNoMessagesLeft(connection);
+ }
public void testQueuePersistentUncommittedAcksLostOnRestart() throws Exception {