You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/02/02 12:14:06 UTC

svn commit: r739961 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/ activemq-core/src/test/java/org/apache/activemq/ activemq-ra/src/test/java/org/apache/activemq/ra/

Author: gtully
Date: Mon Feb  2 11:14:05 2009
New Revision: 739961

URL: http://svn.apache.org/viewvc?rev=739961&view=rev
Log:
resolve issue with broken ra tests, allow XA operation on non transacted session, check for transaction now is xa aware

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
    activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=739961&r1=739960&r2=739961&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Mon Feb  2 11:14:05 2009
@@ -592,7 +592,7 @@
      */
     public void close() throws JMSException {
         if (!unconsumedMessages.isClosed()) {
-            if (session.isTransacted() && session.getTransactionContext().getTransactionId() != null) {
+            if (session.getTransactionContext().isInTransaction()) {
                 session.getTransactionContext().addSynchronization(new Synchronization() {
                     public void afterCommit() throws Exception {
                         doClose();
@@ -667,7 +667,7 @@
             
             // Do we have any acks we need to send out before closing?
             // Ack any delivered messages now.
-            if (!session.isTransacted()) { 
+            if (!session.getTransacted()) { 
                 deliverAcks();
                 if (session.isDupsOkAcknowledge()) {
                     acknowledge();
@@ -752,7 +752,7 @@
             synchronized(deliveredMessages) {
                 deliveredMessages.addFirst(md);
             }
-            if (session.isTransacted()) {
+            if (session.getTransacted()) {
                 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
             }
         }
@@ -766,7 +766,7 @@
             ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
         } else {
             stats.onMessage();
-            if (session.isTransacted()) {
+            if (session.getTransacted()) {
                 // Do nothing.
             } else if (session.isAutoAcknowledge()) {
                 synchronized (deliveredMessages) {
@@ -830,7 +830,7 @@
         // Don't acknowledge now, but we may need to let the broker know the
         // consumer got the message
         // to expand the pre-fetch window
-        if (session.isTransacted()) {
+        if (session.getTransacted()) {
             session.doStartTransaction();
             if (!synchronizationRegistered) {
                 synchronizationRegistered = true;
@@ -892,7 +892,7 @@
             if (ack == null)
             	return; // no msgs
             
-            if (session.isTransacted()) {
+            if (session.getTransacted()) {
                 session.doStartTransaction();
                 ack.setTransactionId(session.getTransactionContext().getTransactionId());
             }
@@ -903,7 +903,7 @@
             deliveredCounter -= deliveredMessages.size();
             additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
     
-            if (!session.isTransacted()) {
+            if (!session.getTransacted()) {
                 deliveredMessages.clear();
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=739961&r1=739960&r2=739961&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Mon Feb  2 11:14:05 2009
@@ -1955,7 +1955,7 @@
     }
     
     protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
-        if (lazy || connection.isSendAcksAsync() || isTransacted()) {
+        if (lazy || connection.isSendAcksAsync() || getTransacted()) {
             asyncSendPacket(ack);
         } else {
             syncSendPacket(ack);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=739961&r1=739960&r2=739961&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java Mon Feb  2 11:14:05 2009
@@ -90,6 +90,10 @@
         return transactionId != null && transactionId.isLocalTransaction();
     }
 
+    public boolean isInTransaction() {
+        return transactionId != null;
+    }
+    
     /**
      * @return Returns the localTransactionEventListener.
      */

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java?rev=739961&r1=739960&r2=739961&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java Mon Feb  2 11:14:05 2009
@@ -203,13 +203,14 @@
                 }
                 session.close();
             }
+            consumeMessage(connection, maxRetries + 1);
         }
     }
     
     // AMQ-1593
     public void testValidateRedeliveryCountOnRollbackWithPrefetch0() throws Exception {
 
-        final int numMessages = 1;
+       final int numMessages = 1;
        ConnectionFactory connectionFactory = 
             new ActiveMQConnectionFactory(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=0");
         Connection connection = connectionFactory.createConnection();
@@ -233,37 +234,55 @@
                 }
                 session.close();
             }
+            
+            consumeMessage(connection, maxRetries + 1);
         }
     }
 
+
+    private void consumeMessage(Connection connection, final int deliveryCount)
+            throws JMSException {
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Destination destination = session.createQueue(destinationName);
+        MessageConsumer consumer = session.createConsumer(destination);            
+        TextMessage msg = (TextMessage) consumer.receive(1000);
+        assertNotNull(msg);
+        assertEquals("redelivery property matches deliveries", deliveryCount, msg.getLongProperty("JMSXDeliveryCount"));
+        session.commit();
+        session.close();
+    }
+
     public void testRedeliveryPropertyWithNoRollback() throws Exception {
+        final int numMessages = 1;
         ConnectionFactory connectionFactory = 
             new ActiveMQConnectionFactory(brokerUrl);
         Connection connection = connectionFactory.createConnection();
         connection.start();
 
-        populateDestination(nbMessages, destinationName, connection);
+        populateDestination(numMessages, destinationName, connection);
         connection.close();
         
         {
             AtomicInteger received = new AtomicInteger();
-            while (received.get() < nbMessages) {
+            final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries();
+            while (received.get() < maxRetries) {
                 connection = connectionFactory.createConnection();
                 connection.start();
-                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
                 Destination destination = session.createQueue(destinationName);
 
                 MessageConsumer consumer = session.createConsumer(destination);            
                 TextMessage msg = (TextMessage) consumer.receive(2000);
                 if (msg != null) {
-                    LOG.info("Received message " + msg.getText() + 
-                            " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
-                    assertFalse(msg.getJMSRedelivered());
-                    assertEquals(1, msg.getLongProperty("JMSXDeliveryCount"));
+                    LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
+                    assertEquals("redelivery property matches deliveries", received.get(), msg.getLongProperty("JMSXDeliveryCount"));
                 }
                 session.close();
                 connection.close();
             }
+            connection = connectionFactory.createConnection();
+            connection.start();
+            consumeMessage(connection, maxRetries + 1);
         }
     }
     

Modified: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java?rev=739961&r1=739960&r2=739961&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java (original)
+++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java Mon Feb  2 11:14:05 2009
@@ -47,15 +47,6 @@
     private static long txGenerator;
     private Xid xid;
 
-    
-    // TODO fix for XA
-    public void testReceiveTwoThenCloseConnection() throws Exception {}
-    public void testReceiveRollback() throws Exception {}
-    public void testReceiveTwoThenRollback() throws Exception {}
-    public void testReceiveTwoThenRollbackManyTimes() throws Exception {}
-    public void testReceiveRollbackWithPrefetchOfOne() throws Exception {}
-    public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {}
-
     @Override
     protected void setSessionTransacted() {
         resourceProvider.setTransacted(false);