You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/02/02 12:14:06 UTC
svn commit: r739961 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/
activemq-core/src/test/java/org/apache/activemq/
activemq-ra/src/test/java/org/apache/activemq/ra/
Author: gtully
Date: Mon Feb 2 11:14:05 2009
New Revision: 739961
URL: http://svn.apache.org/viewvc?rev=739961&view=rev
Log:
resolve issue with broken ra tests, allow XA operation on non transacted session, check for transaction now is xa aware
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=739961&r1=739960&r2=739961&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Mon Feb 2 11:14:05 2009
@@ -592,7 +592,7 @@
*/
public void close() throws JMSException {
if (!unconsumedMessages.isClosed()) {
- if (session.isTransacted() && session.getTransactionContext().getTransactionId() != null) {
+ if (session.getTransactionContext().isInTransaction()) {
session.getTransactionContext().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
doClose();
@@ -667,7 +667,7 @@
// Do we have any acks we need to send out before closing?
// Ack any delivered messages now.
- if (!session.isTransacted()) {
+ if (!session.getTransacted()) {
deliverAcks();
if (session.isDupsOkAcknowledge()) {
acknowledge();
@@ -752,7 +752,7 @@
synchronized(deliveredMessages) {
deliveredMessages.addFirst(md);
}
- if (session.isTransacted()) {
+ if (session.getTransacted()) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
@@ -766,7 +766,7 @@
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} else {
stats.onMessage();
- if (session.isTransacted()) {
+ if (session.getTransacted()) {
// Do nothing.
} else if (session.isAutoAcknowledge()) {
synchronized (deliveredMessages) {
@@ -830,7 +830,7 @@
// Don't acknowledge now, but we may need to let the broker know the
// consumer got the message
// to expand the pre-fetch window
- if (session.isTransacted()) {
+ if (session.getTransacted()) {
session.doStartTransaction();
if (!synchronizationRegistered) {
synchronizationRegistered = true;
@@ -892,7 +892,7 @@
if (ack == null)
return; // no msgs
- if (session.isTransacted()) {
+ if (session.getTransacted()) {
session.doStartTransaction();
ack.setTransactionId(session.getTransactionContext().getTransactionId());
}
@@ -903,7 +903,7 @@
deliveredCounter -= deliveredMessages.size();
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
- if (!session.isTransacted()) {
+ if (!session.getTransacted()) {
deliveredMessages.clear();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=739961&r1=739960&r2=739961&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Mon Feb 2 11:14:05 2009
@@ -1955,7 +1955,7 @@
}
protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
- if (lazy || connection.isSendAcksAsync() || isTransacted()) {
+ if (lazy || connection.isSendAcksAsync() || getTransacted()) {
asyncSendPacket(ack);
} else {
syncSendPacket(ack);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=739961&r1=739960&r2=739961&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java Mon Feb 2 11:14:05 2009
@@ -90,6 +90,10 @@
return transactionId != null && transactionId.isLocalTransaction();
}
+ public boolean isInTransaction() {
+ return transactionId != null;
+ }
+
/**
* @return Returns the localTransactionEventListener.
*/
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java?rev=739961&r1=739960&r2=739961&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java Mon Feb 2 11:14:05 2009
@@ -203,13 +203,14 @@
}
session.close();
}
+ consumeMessage(connection, maxRetries + 1);
}
}
// AMQ-1593
public void testValidateRedeliveryCountOnRollbackWithPrefetch0() throws Exception {
- final int numMessages = 1;
+ final int numMessages = 1;
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=0");
Connection connection = connectionFactory.createConnection();
@@ -233,37 +234,55 @@
}
session.close();
}
+
+ consumeMessage(connection, maxRetries + 1);
}
}
+
+ private void consumeMessage(Connection connection, final int deliveryCount)
+ throws JMSException {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(destinationName);
+ MessageConsumer consumer = session.createConsumer(destination);
+ TextMessage msg = (TextMessage) consumer.receive(1000);
+ assertNotNull(msg);
+ assertEquals("redelivery property matches deliveries", deliveryCount, msg.getLongProperty("JMSXDeliveryCount"));
+ session.commit();
+ session.close();
+ }
+
public void testRedeliveryPropertyWithNoRollback() throws Exception {
+ final int numMessages = 1;
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(brokerUrl);
Connection connection = connectionFactory.createConnection();
connection.start();
- populateDestination(nbMessages, destinationName, connection);
+ populateDestination(numMessages, destinationName, connection);
connection.close();
{
AtomicInteger received = new AtomicInteger();
- while (received.get() < nbMessages) {
+ final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries();
+ while (received.get() < maxRetries) {
connection = connectionFactory.createConnection();
connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination destination = session.createQueue(destinationName);
MessageConsumer consumer = session.createConsumer(destination);
TextMessage msg = (TextMessage) consumer.receive(2000);
if (msg != null) {
- LOG.info("Received message " + msg.getText() +
- " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
- assertFalse(msg.getJMSRedelivered());
- assertEquals(1, msg.getLongProperty("JMSXDeliveryCount"));
+ LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
+ assertEquals("redelivery property matches deliveries", received.get(), msg.getLongProperty("JMSXDeliveryCount"));
}
session.close();
connection.close();
}
+ connection = connectionFactory.createConnection();
+ connection.start();
+ consumeMessage(connection, maxRetries + 1);
}
}
Modified: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java?rev=739961&r1=739960&r2=739961&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java (original)
+++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java Mon Feb 2 11:14:05 2009
@@ -47,15 +47,6 @@
private static long txGenerator;
private Xid xid;
-
- // TODO fix for XA
- public void testReceiveTwoThenCloseConnection() throws Exception {}
- public void testReceiveRollback() throws Exception {}
- public void testReceiveTwoThenRollback() throws Exception {}
- public void testReceiveTwoThenRollbackManyTimes() throws Exception {}
- public void testReceiveRollbackWithPrefetchOfOne() throws Exception {}
- public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {}
-
@Override
protected void setSessionTransacted() {
resourceProvider.setTransacted(false);