You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/09/30 12:41:47 UTC

[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5854 - fix intermittent test failure

Repository: activemq
Updated Branches:
  refs/heads/master 94b56977d -> fc2553574


https://issues.apache.org/jira/browse/AMQ-5854 - fix intermittent test failure


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fc255357
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fc255357
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fc255357

Branch: refs/heads/master
Commit: fc25535748fb8dbaea588203086c4802d1ccf091
Parents: 8514e38
Author: gtully <ga...@gmail.com>
Authored: Wed Sep 30 11:40:30 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Sep 30 11:41:08 2015 +0100

----------------------------------------------------------------------
 .../org/apache/activemq/TransactionContext.java |  2 +-
 .../failover/FailoverTransactionTest.java       | 36 ++++++++++++--------
 2 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fc255357/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
index 0925863..da05059 100755
--- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
+++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
@@ -313,7 +313,7 @@ public class TransactionContext implements XAResource {
             throw e;
         }
 
-        if (rollbackOnly) {
+        if (transactionId != null && rollbackOnly) {
             final String message = "Commit of " + transactionId + "  failed due to rollback only request; typically due to failover with pending acks";
             try {
                 rollback();

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc255357/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index 1f5ac7d..3d29ee0 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -1120,6 +1120,8 @@ public class FailoverTransactionTest extends TestSupport {
         final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=0");
         final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        final Session secondConsumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+
         MessageConsumer consumer = consumerSession.createConsumer(destination);
 
         produceMessage(producerSession, destination);
@@ -1130,22 +1132,25 @@ public class FailoverTransactionTest extends TestSupport {
         assertNotNull("got message just produced", msg);
 
         // add another consumer into the mix that may get the message after restart
-        MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"));
+        MessageConsumer consumer2 = secondConsumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"));
 
         broker.stop();
         broker = createBroker(false, url);
         broker.start();
 
         final CountDownLatch commitDone = new CountDownLatch(1);
+        final CountDownLatch gotRollback = new CountDownLatch(1);
 
         final Vector<Exception> exceptions = new Vector<Exception>();
 
-        // commit may fail if other consumer gets the message on restart
+        // commit will fail due to failover with outstanding ack
         Executors.newSingleThreadExecutor().execute(new Runnable() {
             public void run() {
                 LOG.info("doing async commit...");
                 try {
                     consumerSession.commit();
+                } catch (TransactionRolledBackException ex) {
+                    gotRollback.countDown();
                 } catch (JMSException ex) {
                     exceptions.add(ex);
                 } finally {
@@ -1156,23 +1161,24 @@ public class FailoverTransactionTest extends TestSupport {
 
 
         assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
+        assertTrue("got Rollback", gotRollback.await(15, TimeUnit.SECONDS));
 
-        // either message redelivered in existing tx or consumed by consumer2
-        // should not be available again in any event
-        assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000));
+        assertTrue("no other exceptions", exceptions.isEmpty());
 
         // consumer replay is hashmap order dependent on a failover connection state recover so need to deal with both cases
-        if (exceptions.isEmpty()) {
-            LOG.info("commit succeeded, message was redelivered to the correct consumer after restart so commit was fine");
-            assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000));
-        } else {
-            LOG.info("commit failed, consumer2 should get it", exceptions.get(0));
-            assertNotNull("consumer2 got message", consumer2.receive(2000));
-            consumerSession.commit();
-            // no message should be in dlq
-            MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
-            assertNull("nothing in the dlq", dlqConsumer.receive(5000));
+        // consume message from one of the consumers
+        Message message = consumer2.receive(2000);
+        if (message == null) {
+            message = consumer.receive(2000);
         }
+        consumerSession.commit();
+        secondConsumerSession.commit();
+
+        assertNotNull("got message after rollback", message);
+
+        // no message should be in dlq
+        MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
+        assertNull("nothing in the dlq", dlqConsumer.receive(2000));
         connection.close();
     }
 


[2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5951 - scenario wheere transaction command can block, additional test and further fix

Posted by gt...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5951 - scenario wheere transaction command can block, additional test and further fix


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8514e381
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8514e381
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8514e381

Branch: refs/heads/master
Commit: 8514e38135cf3c4da913806f3677a89785613e10
Parents: 94b5697
Author: gtully <ga...@gmail.com>
Authored: Tue Sep 29 16:34:36 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Sep 30 11:41:08 2015 +0100

----------------------------------------------------------------------
 .../transport/failover/FailoverTransport.java   |  8 +++-
 .../transport/failover/FailoverTimeoutTest.java | 48 ++++++++++++++++----
 2 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8514e381/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index 4e196b3..0f36d67 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -607,7 +607,7 @@ public class FailoverTransport implements CompositeTransport {
                         long start = System.currentTimeMillis();
                         boolean timedout = false;
                         while (transport == null && !disposed && connectionFailure == null
-                                && !Thread.currentThread().isInterrupted()) {
+                                && !Thread.currentThread().isInterrupted() && willReconnect()) {
                             if (LOG.isTraceEnabled()) {
                                 LOG.trace("Waiting for transport to reconnect..: " + command);
                             }
@@ -639,6 +639,8 @@ public class FailoverTransport implements CompositeTransport {
                                 error = connectionFailure;
                             } else if (timedout == true) {
                                 error = new IOException("Failover timeout of " + timeout + " ms reached.");
+                            } else if (!willReconnect()) {
+                                error = new IOException("Reconnect attempts of " + maxReconnectAttempts + " exceeded");
                             } else {
                                 error = new IOException("Unexpected failure.");
                             }
@@ -723,6 +725,10 @@ public class FailoverTransport implements CompositeTransport {
         }
     }
 
+    private boolean willReconnect() {
+        return firstConnection || 0 != calculateReconnectAttemptLimit();
+    }
+
     @Override
     public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
         throw new AssertionError("Unsupported Method");

http://git-wip-us.apache.org/repos/asf/activemq/blob/8514e381/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
index 35a970f..7c36840 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Connection;
 import javax.jms.ExceptionListener;
@@ -37,7 +38,10 @@ import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.TransactionInfo;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -97,6 +101,14 @@ public class FailoverTimeoutTest {
         LOG.info("Time spent waiting to connect: {} ms", duration);
 
         assertTrue(duration > 3000);
+
+        safeClose(connection);
+    }
+
+    private void safeClose(Connection connection) {
+        try {
+            connection.close();
+        } catch (Exception ignored) {}
     }
 
     @Test
@@ -131,10 +143,29 @@ public class FailoverTimeoutTest {
     }
 
     @Test
-    public void testInterleaveSendAndException() throws Exception {
+    public void testInterleaveAckAndException() throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?maxReconnectAttempts=0");
+        final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+
+        doTestInterleaveAndException(connection, new MessageAck());
+        safeClose(connection);
+    }
 
+    @Test
+    public void testInterleaveTxAndException() throws Exception {
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?maxReconnectAttempts=0");
         final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+
+        TransactionInfo tx = new TransactionInfo();
+        tx.setConnectionId(connection.getConnectionInfo().getConnectionId());
+        tx.setTransactionId(new LocalTransactionId(tx.getConnectionId(), 1));
+        doTestInterleaveAndException(connection, tx);
+
+        safeClose(connection);
+    }
+
+    public void doTestInterleaveAndException(final ActiveMQConnection connection, final Command command) throws Exception {
+
         connection.start();
 
         connection.setExceptionListener(new ExceptionListener() {
@@ -143,7 +174,7 @@ public class FailoverTimeoutTest {
                 try {
                     LOG.info("Deal with exception - invoke op that may block pending outstanding oneway");
                     // try and invoke on connection as part of handling exception
-                    connection.asyncSendPacket(new MessageAck());
+                    connection.asyncSendPacket(command);
                 } catch (Exception e) {
                 }
             }
@@ -154,24 +185,24 @@ public class FailoverTimeoutTest {
         final int NUM_TASKS = 200;
         final CountDownLatch enqueueOnExecutorDone = new CountDownLatch(NUM_TASKS);
 
+        // let a few tasks delay a bit
+        final AtomicLong sleepMillis = new AtomicLong(1000);
         for (int i=0; i < NUM_TASKS; i++) {
-
             executorService.submit(new Runnable() {
                 @Override
                 public void run() {
                     try {
-                        connection.asyncSendPacket(new MessageAck());
-                    } catch (JMSException e) {
-                        e.printStackTrace();
+                        TimeUnit.MILLISECONDS.sleep(Math.max(0, sleepMillis.addAndGet(-50)));
+                        connection.asyncSendPacket(command);
+                    } catch (Exception e) {
                     } finally {
                         enqueueOnExecutorDone.countDown();
                     }
-
                 }
             });
         }
 
-        while (enqueueOnExecutorDone.getCount() > (NUM_TASKS - 20)) {
+        while (enqueueOnExecutorDone.getCount() > (NUM_TASKS - 10)) {
             enqueueOnExecutorDone.await(20, TimeUnit.MILLISECONDS);
         }
 
@@ -184,6 +215,7 @@ public class FailoverTimeoutTest {
         assertTrue("all ops finish", enqueueOnExecutorDone.await(15, TimeUnit.SECONDS));
     }
 
+
     @Test
     public void testUpdateUris() throws Exception {