You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/06/01 16:32:51 UTC
svn commit: r1345202 [2/2] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/command/ main/java/org/apache/activemq/store/
main/java/org/apache/activ...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java Fri Jun 1 14:32:50 2012
@@ -30,6 +30,7 @@ import org.apache.activemq.store.ProxyTo
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.jdbc.JDBCMessageStore;
import java.io.IOException;
import java.util.ArrayList;
@@ -45,16 +46,16 @@ import java.util.concurrent.Future;
*/
public class MemoryTransactionStore implements TransactionStore {
- ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
- ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
- final PersistenceAdapter persistenceAdapter;
+ protected ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
+ protected ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
+ protected final PersistenceAdapter persistenceAdapter;
private boolean doingRecover;
public class Tx {
- private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
+ public ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
- private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
+ public final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
public void add(AddMessageCommand msg) {
messages.add(msg);
@@ -114,6 +115,8 @@ public class MemoryTransactionStore impl
public interface AddMessageCommand {
Message getMessage();
+ MessageStore getMessageStore();
+
void run(ConnectionContext context) throws IOException;
}
@@ -121,6 +124,8 @@ public class MemoryTransactionStore impl
MessageAck getMessageAck();
void run(ConnectionContext context) throws IOException;
+
+ MessageStore getMessageStore();
}
public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
@@ -164,7 +169,7 @@ public class MemoryTransactionStore impl
}
public TopicMessageStore proxy(TopicMessageStore messageStore) {
- return new ProxyTopicMessageStore(messageStore) {
+ ProxyTopicMessageStore proxyTopicMessageStore = new ProxyTopicMessageStore(messageStore) {
@Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), send);
@@ -204,12 +209,17 @@ public class MemoryTransactionStore impl
subscriptionName, messageId, ack);
}
};
+ onProxyTopicStore(proxyTopicMessageStore);
+ return proxyTopicMessageStore;
+ }
+
+ protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
}
/**
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
- public void prepare(TransactionId txid) {
+ public void prepare(TransactionId txid) throws IOException {
Tx tx = inflightTransactions.remove(txid);
if (tx == null) {
return;
@@ -226,6 +236,15 @@ public class MemoryTransactionStore impl
return tx;
}
+ public Tx getPreparedTx(TransactionId txid) {
+ Tx tx = preparedTransactions.get(txid);
+ if (tx == null) {
+ tx = new Tx();
+ preparedTransactions.put(txid, tx);
+ }
+ return tx;
+ }
+
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
if (preCommit != null) {
preCommit.run();
@@ -248,7 +267,7 @@ public class MemoryTransactionStore impl
/**
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
- public void rollback(TransactionId txid) {
+ public void rollback(TransactionId txid) throws IOException {
preparedTransactions.remove(txid);
inflightTransactions.remove(txid);
}
@@ -268,12 +287,16 @@ public class MemoryTransactionStore impl
Object txid = iter.next();
Tx tx = preparedTransactions.get(txid);
listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
+ onRecovered(tx);
}
} finally {
this.doingRecover = false;
}
}
+ protected void onRecovered(Tx tx) {
+ }
+
/**
* @param message
* @throws IOException
@@ -291,6 +314,11 @@ public class MemoryTransactionStore impl
return message;
}
+ @Override
+ public MessageStore getMessageStore() {
+ return destination;
+ }
+
public void run(ConnectionContext ctx) throws IOException {
destination.addMessage(ctx, message);
}
@@ -320,13 +348,18 @@ public class MemoryTransactionStore impl
public void run(ConnectionContext ctx) throws IOException {
destination.removeMessage(ctx, ack);
}
+
+ @Override
+ public MessageStore getMessageStore() {
+ return destination;
+ }
});
} else {
destination.removeMessage(null, ack);
}
}
- final void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
+ public void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
final MessageId messageId, final MessageAck ack) throws IOException {
if (doingRecover) {
return;
@@ -342,6 +375,11 @@ public class MemoryTransactionStore impl
public void run(ConnectionContext ctx) throws IOException {
destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
}
+
+ @Override
+ public MessageStore getMessageStore() {
+ return destination;
+ }
});
} else {
destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java?rev=1345202&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java Fri Jun 1 14:32:50 2012
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import junit.framework.Test;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.apache.derby.jdbc.EmbeddedXADataSource;
+
+public class JdbcXARecoveryBrokerTest extends XARecoveryBrokerTest {
+
+ EmbeddedXADataSource dataSource;
+
+ @Override
+ protected void setUp() throws Exception {
+ dataSource = new EmbeddedXADataSource();
+ dataSource.setDatabaseName("derbyDb");
+ dataSource.setCreateDatabase("create");
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ stopDerby();
+ }
+
+ @Override
+ protected void configureBroker(BrokerService broker) throws Exception {
+ super.configureBroker(broker);
+
+ JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+ jdbc.setDataSource(dataSource);
+ broker.setPersistenceAdapter(jdbc);
+ }
+
+ @Override
+ protected void restartBroker() throws Exception {
+ broker.stop();
+ stopDerby();
+ dataSource = new EmbeddedXADataSource();
+ dataSource.setDatabaseName("derbyDb");
+ dataSource.setCreateDatabase("create");
+
+ broker = createRestartedBroker();
+ broker.start();
+ }
+
+ private void stopDerby() {
+ LOG.info("STOPPING DB!@!!!!");
+ final EmbeddedDataSource ds = dataSource;
+ try {
+ ds.setShutdownDatabase("shutdown");
+ ds.getConnection();
+ } catch (Exception ignored) {
+ }
+
+ }
+
+ public static Test suite() {
+ return suite(JdbcXARecoveryBrokerTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ @Override
+ protected ActiveMQDestination createDestination() {
+ return new ActiveMQQueue("test,special");
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java Fri Jun 1 14:32:50 2012
@@ -23,9 +23,13 @@ import javax.management.MalformedObjectN
import javax.management.ObjectName;
import junit.framework.Test;
+import org.apache.activemq.broker.jmx.DestinationView;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataArrayResponse;
@@ -37,6 +41,7 @@ import org.apache.activemq.command.Sessi
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.JMXSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +53,8 @@ import org.slf4j.LoggerFactory;
*/
public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(XARecoveryBrokerTest.class);
+ public boolean prioritySupport = false;
+
public void testPreparedJmxView() throws Exception {
ActiveMQDestination destination = createDestination();
@@ -96,6 +103,10 @@ public class XARecoveryBrokerTest extend
dar = (DataArrayResponse)response;
assertEquals(4, dar.getData().length);
+ // validate destination depth via jmx
+ DestinationViewMBean destinationView = getProxyToDestination(destinationList(destination)[0]);
+ assertEquals("enqueue count does not see prepared", 0, destinationView.getQueueSize());
+
TransactionId first = (TransactionId)dar.getData()[0];
// via jmx, force outcome
for (int i = 0; i < 4; i++) {
@@ -131,6 +142,16 @@ public class XARecoveryBrokerTest extend
return proxy;
}
+ private DestinationViewMBean getProxyToDestination(ActiveMQDestination destination) throws MalformedObjectNameException, JMSException {
+
+ ObjectName objectName = new ObjectName("org.apache.activemq:Type=" + (destination.isQueue() ? "Queue" : "Topic") + ",Destination=" +
+ JMXSupport.encodeObjectNamePart(destination.getPhysicalName()) + ",BrokerName=localhost");
+ DestinationViewMBean proxy = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(objectName,
+ DestinationViewMBean.class, true);
+ return proxy;
+
+ }
+
public void testPreparedTransactionRecoveredOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
@@ -213,6 +234,94 @@ public class XARecoveryBrokerTest extend
assertNoMessagesLeft(connection);
}
+ public void testTopicPreparedTransactionRecoveredOnRestart() throws Exception {
+ ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ connectionInfo.setClientId("durable");
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+ consumerInfo.setSubscriptionName("durable");
+ connection.send(consumerInfo);
+
+ // Prepare 4 message sends.
+ for (int i = 0; i < 4; i++) {
+ // Begin the transaction.
+ XATransactionId txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ message.setTransactionId(txid);
+ connection.send(message);
+
+ // Prepare
+ connection.send(createPrepareTransaction(connectionInfo, txid));
+ }
+
+ // Since prepared but not committed.. they should not get delivered.
+ assertNull(receiveMessage(connection));
+ assertNoMessagesLeft(connection);
+ connection.request(closeConnectionInfo(connectionInfo));
+
+ // restart the broker.
+ restartBroker();
+
+ // Setup the consumer and try receive the message.
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ connectionInfo.setClientId("durable");
+
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ consumerInfo = createConsumerInfo(sessionInfo, destination);
+ consumerInfo.setSubscriptionName("durable");
+ connection.send(consumerInfo);
+
+ // Since prepared but not committed.. they should not get delivered.
+ assertNull(receiveMessage(connection));
+ assertNoMessagesLeft(connection);
+
+ Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
+ assertNotNull(response);
+ DataArrayResponse dar = (DataArrayResponse) response;
+ assertEquals(4, dar.getData().length);
+
+ // ensure we can close a connection with prepared transactions
+ connection.request(closeConnectionInfo(connectionInfo));
+
+ // open again to deliver outcome
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ connectionInfo.setClientId("durable");
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ consumerInfo = createConsumerInfo(sessionInfo, destination);
+ consumerInfo.setSubscriptionName("durable");
+ connection.send(consumerInfo);
+
+ // Commit the prepared transactions.
+ for (int i = 0; i < dar.getData().length; i++) {
+ connection.send(createCommitTransaction2Phase(connectionInfo, (TransactionId) dar.getData()[i]));
+ }
+
+ // We should get the committed transactions.
+ for (int i = 0; i < expectedMessageCount(4, destination); i++) {
+ Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10));
+ assertNotNull(m);
+ }
+
+ assertNoMessagesLeft(connection);
+
+ }
+
public void testQueuePersistentCommitedMessagesNotLostOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
@@ -260,6 +369,55 @@ public class XARecoveryBrokerTest extend
assertNoMessagesLeft(connection);
}
+ public void testQueuePersistentCommited2PhaseMessagesNotLostOnRestart() throws Exception {
+
+ ActiveMQDestination destination = createDestination();
+
+ // Setup the producer and send the message.
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ // Begin the transaction.
+ XATransactionId txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+
+ for (int i = 0; i < 4; i++) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ message.setTransactionId(txid);
+ connection.send(message);
+ }
+
+ // Commit 2 phase
+ connection.request(createPrepareTransaction(connectionInfo, txid));
+ connection.send(createCommitTransaction2Phase(connectionInfo, txid));
+
+ connection.request(closeConnectionInfo(connectionInfo));
+ // restart the broker.
+ restartBroker();
+
+ // Setup the consumer and receive the message.
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+
+ for (int i = 0; i < expectedMessageCount(4, destination); i++) {
+ Message m = receiveMessage(connection);
+ assertNotNull(m);
+ }
+
+ assertNoMessagesLeft(connection);
+ }
+
public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
@@ -396,6 +554,90 @@ public class XARecoveryBrokerTest extend
assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
}
+ public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {
+ addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
+ }
+
+ public void testTopicPersistentPreparedAcksNotLostOnRestart() throws Exception {
+ // REVISIT for kahadb
+ if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
+ LOG.warn("only works on jdbc");
+ return;
+ }
+ ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+ // Setup the producer and send the message.
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ connectionInfo.setClientId("durable");
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ // setup durable subs
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+ consumerInfo.setSubscriptionName("durable");
+ connection.send(consumerInfo);
+
+ for (int i = 0; i < 4; i++) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ connection.send(message);
+ }
+
+ // Begin the transaction.
+ XATransactionId txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+
+ final int messageCount = expectedMessageCount(4, destination);
+ Message m = null;
+ for (int i = 0; i < messageCount; i++) {
+ m = receiveMessage(connection);
+ assertNotNull("unexpected null on: " + i, m);
+ }
+
+ // one ack with last received, mimic a beforeEnd synchronization
+ MessageAck ack = createAck(consumerInfo, m, messageCount, MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection.send(ack);
+
+ connection.request(createPrepareTransaction(connectionInfo, txid));
+
+ // restart the broker.
+ restartBroker();
+
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ connectionInfo.setClientId("durable");
+ connection.send(connectionInfo);
+
+ // validate recovery
+ TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER);
+ DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+
+ assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length);
+ assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
+
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(sessionInfo);
+ consumerInfo = createConsumerInfo(sessionInfo, destination);
+ consumerInfo.setSubscriptionName("durable");
+ connection.send(consumerInfo);
+
+ // no redelivery, exactly once semantics unless there is rollback
+ m = receiveMessage(connection);
+ assertNull(m);
+ assertNoMessagesLeft(connection);
+
+ connection.request(createCommitTransaction2Phase(connectionInfo, txid));
+
+ // validate recovery complete
+ dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+ assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
+ }
+
public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
ActiveMQDestination destination = createDestination();
@@ -409,7 +651,8 @@ public class XARecoveryBrokerTest extend
connection.send(sessionInfo);
connection.send(producerInfo);
- for (int i = 0; i < 4; i++) {
+ int numMessages = 4;
+ for (int i = 0; i < numMessages; i++) {
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
connection.send(message);
@@ -426,13 +669,13 @@ public class XARecoveryBrokerTest extend
consumerInfo = createConsumerInfo(sessionInfo, dest);
connection.send(consumerInfo);
- for (int i = 0; i < 4; i++) {
+ for (int i = 0; i < numMessages; i++) {
message = receiveMessage(connection);
assertNotNull(message);
}
// one ack with last received, mimic a beforeEnd synchronization
- MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
+ MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
}
@@ -466,7 +709,7 @@ public class XARecoveryBrokerTest extend
// rollback so we get redelivery
connection.request(createRollbackTransaction(connectionInfo, txid));
- // Begin new transaction for redelivery
+ LOG.info("new tx for redelivery");
txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
@@ -475,11 +718,11 @@ public class XARecoveryBrokerTest extend
consumerInfo = createConsumerInfo(sessionInfo, dest);
connection.send(consumerInfo);
- for (int i = 0; i < 4; i++) {
+ for (int i = 0; i < numMessages; i++) {
message = receiveMessage(connection);
- assertNotNull(message);
+ assertNotNull("unexpected null on:" + i, message);
}
- MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
+ MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
}
@@ -492,6 +735,180 @@ public class XARecoveryBrokerTest extend
assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
}
+ public void initCombosForTestTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() {
+ addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
+ }
+
+ public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
+
+ // REVISIT for kahadb
+ if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
+ LOG.warn("only works on jdbc");
+ return;
+ }
+
+ ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+ // Setup the producer and send the message.
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ connectionInfo.setClientId("durable");
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ // setup durable subs
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+ consumerInfo.setSubscriptionName("durable");
+ connection.send(consumerInfo);
+
+ int numMessages = 4;
+ for (int i = 0; i < numMessages; i++) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ connection.send(message);
+ }
+
+ // Begin the transaction.
+ XATransactionId txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+
+ Message message = null;
+ for (int i = 0; i < numMessages; i++) {
+ message = receiveMessage(connection);
+ assertNotNull(message);
+ }
+
+ // one ack with last received, mimic a beforeEnd synchronization
+ MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection.send(ack);
+
+ connection.request(createPrepareTransaction(connectionInfo, txid));
+
+ // restart the broker.
+ restartBroker();
+
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ connectionInfo.setClientId("durable");
+ connection.send(connectionInfo);
+
+ // validate recovery
+ TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER);
+ DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+
+ assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length);
+ assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
+
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(sessionInfo);
+ consumerInfo = createConsumerInfo(sessionInfo, destination);
+ consumerInfo.setSubscriptionName("durable");
+ connection.send(consumerInfo);
+
+ // no redelivery, exactly once semantics while prepared
+ message = receiveMessage(connection);
+ assertNull(message);
+ assertNoMessagesLeft(connection);
+
+ // rollback so we get redelivery
+ connection.request(createRollbackTransaction(connectionInfo, txid));
+
+ LOG.info("new tx for redelivery");
+ txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+
+ for (int i = 0; i < numMessages; i++) {
+ message = receiveMessage(connection);
+ assertNotNull("unexpected null on:" + i, message);
+ }
+ ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection.send(ack);
+
+ // Commit
+ connection.request(createCommitTransaction1Phase(connectionInfo, txid));
+
+ // validate recovery complete
+ dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+ assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
+ }
+
+ public void initCombosForTestTopicPersistentPreparedAcksAvailableAfterRollback() {
+ addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
+ }
+
+ public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception {
+
+ // REVISIT for kahadb
+ if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
+ LOG.warn("only works on jdbc");
+ return;
+ }
+
+ ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+ // Setup the producer and send the message.
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ connectionInfo.setClientId("durable");
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ // setup durable subs
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+ consumerInfo.setSubscriptionName("durable");
+ connection.send(consumerInfo);
+
+ int numMessages = 4;
+ for (int i = 0; i < numMessages; i++) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ connection.send(message);
+ }
+
+ // Begin the transaction.
+ XATransactionId txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+
+ Message message = null;
+ for (int i = 0; i < numMessages; i++) {
+ message = receiveMessage(connection);
+ assertNotNull(message);
+ }
+
+ // one ack with last received, mimic a beforeEnd synchronization
+ MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection.send(ack);
+
+ connection.request(createPrepareTransaction(connectionInfo, txid));
+
+ // rollback so we get redelivery
+ connection.request(createRollbackTransaction(connectionInfo, txid));
+
+ LOG.info("new tx for redelivery");
+ txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+
+ for (int i = 0; i < numMessages; i++) {
+ message = receiveMessage(connection);
+ assertNotNull("unexpected null on:" + i, message);
+ }
+ ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection.send(ack);
+
+ // Commit
+ connection.request(createCommitTransaction1Phase(connectionInfo, txid));
+ }
+
private ActiveMQDestination[] destinationList(ActiveMQDestination dest) {
return dest.isComposite() ? dest.getCompositeDestinations() : new ActiveMQDestination[]{dest};
}
@@ -564,6 +981,13 @@ public class XARecoveryBrokerTest extend
assertNoMessagesLeft(connection);
}
+ @Override
+ protected PolicyEntry getDefaultPolicy() {
+ PolicyEntry policyEntry = super.getDefaultPolicy();
+ policyEntry.setPrioritizedMessages(prioritySupport);
+ return policyEntry;
+ }
+
public static Test suite() {
return suite(XARecoveryBrokerTest.class);
}