You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/21 23:56:23 UTC

[67/68] [abbrv] activemq-artemis git commit: more test fixes added FailoverTransactionTest back

more test fixes
added FailoverTransactionTest back


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3260d5d1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3260d5d1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3260d5d1

Branch: refs/heads/refactor-openwire
Commit: 3260d5d15da80cf3563b5945a686afd6016dbba9
Parents: 0182b81
Author: Howard Gao <ho...@gmail.com>
Authored: Mon Mar 21 20:56:09 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Mar 21 18:54:50 2016 -0400

----------------------------------------------------------------------
 .../core/protocol/openwire/amq/AMQConsumer.java |  14 +-
 tests/activemq5-unit-tests/pom.xml              |   3 -
 .../transport/SoWriteTimeoutClientTest.java     |  57 +++---
 .../failover/FailoverTransactionTest.java       | 184 +++++++++++--------
 4 files changed, 146 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3260d5d1/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index b4056fb..ef9b2a8 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -46,7 +46,6 @@ import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.wireformat.WireFormat;
 
 public class AMQConsumer {
-
    private AMQSession session;
    private org.apache.activemq.command.ActiveMQDestination openwireDestination;
    private ConsumerInfo info;
@@ -238,9 +237,16 @@ public class AMQConsumer {
             mi = iter.next();
             if (mi.amqId.equals(lastm)) {
                n++;
-               iter.remove();
-               session.getCoreSession().individualAcknowledge(nativeId, mi.nativeId);
-               session.getCoreSession().commit();
+               if (!isLocalTx) {
+                  iter.remove();
+                  session.getCoreSession().individualAcknowledge(nativeId, mi.nativeId);
+               }
+               else {
+                  mi.setLocalAcked(true);
+               }
+               if (tid == null) {
+                  session.getCoreSession().commit();
+               }
                break;
             }
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3260d5d1/tests/activemq5-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/pom.xml b/tests/activemq5-unit-tests/pom.xml
index bbfbcd3..9eee81c 100644
--- a/tests/activemq5-unit-tests/pom.xml
+++ b/tests/activemq5-unit-tests/pom.xml
@@ -436,9 +436,6 @@
                   <include>**/org/apache/activemq/blob/BlobTransferPolicyUriTest.java</include>
                </includes>
                <excludes>
-                  <!-- this test is know to pass (run in standalone) but it seems hang when running in testsuite -->
-                  <exclude>**/org/apache/activemq/transport/failover/FailoverTransactionTest.java</exclude>
-
                   <!-- exclude tests that can cause hang -->
                   <exclude>**/org/apache/activemq/PerDestinationStoreLimitTest.java</exclude>
                   <exclude>**/org/apache/activemq/ProducerFlowControlTest.java</exclude>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3260d5d1/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
index 3506ff0..0998599 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
@@ -74,36 +74,41 @@ public class SoWriteTimeoutClientTest extends OpenwireArtemisBaseTest {
       MessageConsumer consumer = session.createConsumer(dest);
 
       SocketProxy proxy = new SocketProxy();
-      proxy.setTarget(tcpBrokerUri);
-      proxy.open();
-
-      ActiveMQConnectionFactory pFactory = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl() + "?soWriteTimeout=4000&sleep=500)?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=6638400");
-      final Connection pc = pFactory.createConnection();
-      pc.start();
-      proxy.pause();
-
-      final int messageCount = 20;
-      ExecutorService executorService = Executors.newCachedThreadPool();
-      executorService.execute(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               sendMessages(pc, dest, messageCount);
-            }
-            catch (Exception ignored) {
-               ignored.printStackTrace();
+      try {
+         proxy.setTarget(tcpBrokerUri);
+         proxy.open();
+
+         ActiveMQConnectionFactory pFactory = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl() + "?soWriteTimeout=4000&sleep=500)?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=6638400");
+         final Connection pc = pFactory.createConnection();
+         pc.start();
+         proxy.pause();
+
+         final int messageCount = 20;
+         ExecutorService executorService = Executors.newCachedThreadPool();
+         executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+               try {
+                  sendMessages(pc, dest, messageCount);
+               }
+               catch (Exception ignored) {
+                  ignored.printStackTrace();
+               }
             }
+         });
+
+         // wait for timeout and reconnect
+         TimeUnit.SECONDS.sleep(8);
+         proxy.goOn();
+         for (int i = 0; i < messageCount; i++) {
+            Assert.assertNotNull("Got message " + i + " after reconnect", consumer.receive(10000));
          }
-      });
 
-      // wait for timeout and reconnect
-      TimeUnit.SECONDS.sleep(8);
-      proxy.goOn();
-      for (int i = 0; i < messageCount; i++) {
-         Assert.assertNotNull("Got message " + i + " after reconnect", consumer.receive(10000));
+         Assert.assertNull(consumer.receive(5000));
+      }
+      finally {
+         proxy.close();
       }
-
-      Assert.assertNull(consumer.receive(5000));
 
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3260d5d1/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index 4aaec57..54fedf1 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -77,7 +77,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
    private static boolean firstSend;
    private static int count;
 
-   private static EmbeddedJMS broker;
+   private static volatile EmbeddedJMS broker;
 
    @Before
    public void setUp() throws Exception {
@@ -111,6 +111,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
 
    @Test
    public void testFailoverProducerCloseBeforeTransaction() throws Exception {
+      LOG.info(this + " running test testFailoverProducerCloseBeforeTransaction");
       startCleanBroker();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
       configureConnectionFactory(cf);
@@ -144,6 +145,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
            }
    )
    public void testFailoverCommitReplyLost() throws Exception {
+      LOG.info(this + " running test testFailoverCommitReplyLost");
 
       broker = createBroker();
       startBrokerWithDurableQueue();
@@ -177,7 +179,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
       }.start();
 
       // will be stopped by the plugin
-      brokerStopLatch.await();
+      brokerStopLatch.await(60, TimeUnit.SECONDS);
       doByteman.set(false);
       broker = createBroker();
       broker.start();
@@ -240,7 +242,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
            }
    )
    public void testFailoverSendReplyLost() throws Exception {
-
+      LOG.info(this + " running test testFailoverSendReplyLost");
       broker = createBroker();
       startBrokerWithDurableQueue();
       doByteman.set(true);
@@ -272,7 +274,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
       }.start();
 
       // will be stopped by the plugin
-      brokerStopLatch.await();
+      brokerStopLatch.await(60, TimeUnit.SECONDS);
       doByteman.set(false);
       broker = createBroker();
       LOG.info("restarting....");
@@ -324,7 +326,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
            }
    )
    public void testFailoverConnectionSendReplyLost() throws Exception {
-
+      LOG.info(this + " running test testFailoverConnectionSendReplyLost");
       broker = createBroker();
       proxy = new SocketProxy();
       firstSend = true;
@@ -398,6 +400,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
 
    @Test
    public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
+      LOG.info(this + " running test testFailoverProducerCloseBeforeTransactionFailWhenDisabled");
       startCleanBroker();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
       configureConnectionFactory(cf);
@@ -423,6 +426,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
 
    @Test
    public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
+      LOG.info(this + " running test testFailoverMultipleProducerCloseBeforeTransaction");
       startCleanBroker();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
       configureConnectionFactory(cf);
@@ -457,53 +461,59 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
    // https://issues.apache.org/activemq/browse/AMQ-2772
    @Test
    public void testFailoverWithConnectionConsumer() throws Exception {
+      LOG.info(this + " running test testFailoverWithConnectionConsumer");
       startCleanBroker();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
       configureConnectionFactory(cf);
       Connection connection = cf.createConnection();
       connection.start();
-
-      Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-      Queue destination = session.createQueue(QUEUE_NAME);
-
       final CountDownLatch connectionConsumerGotOne = new CountDownLatch(1);
-      final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      connection.createConnectionConsumer(destination, null, new ServerSessionPool() {
-         public ServerSession getServerSession() throws JMSException {
-            return new ServerSession() {
-               public Session getSession() throws JMSException {
-                  return poolSession;
-               }
 
-               public void start() throws JMSException {
-                  connectionConsumerGotOne.countDown();
-                  poolSession.run();
-               }
-            };
-         }
-      }, 1);
+      try {
+         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+         Queue destination = session.createQueue(QUEUE_NAME);
+
+         final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         connection.createConnectionConsumer(destination, null, new ServerSessionPool() {
+            public ServerSession getServerSession() throws JMSException {
+               return new ServerSession() {
+                  public Session getSession() throws JMSException {
+                     return poolSession;
+                  }
 
-      MessageConsumer consumer = session.createConsumer(destination);
-      MessageProducer producer;
-      TextMessage message;
-      final int count = 10;
-      for (int i = 0; i < count; i++) {
-         producer = session.createProducer(destination);
-         message = session.createTextMessage("Test message: " + count);
-         producer.send(message);
-         producer.close();
-      }
+                  public void start() throws JMSException {
+                     connectionConsumerGotOne.countDown();
+                     poolSession.run();
+                  }
+               };
+            }
+         }, 1);
+
+         MessageConsumer consumer = session.createConsumer(destination);
+         MessageProducer producer;
+         TextMessage message;
+         final int count = 10;
+         for (int i = 0; i < count; i++) {
+            producer = session.createProducer(destination);
+            message = session.createTextMessage("Test message: " + count);
+            producer.send(message);
+            producer.close();
+         }
 
-      // restart to force failover and connection state recovery before the commit
-      broker.stop();
-      startBroker();
+         // restart to force failover and connection state recovery before the commit
+         broker.stop();
+         startBroker();
 
-      session.commit();
-      for (int i = 0; i < count - 1; i++) {
-         Assert.assertNotNull("Failed to get message: " + count, consumer.receive(20000));
+         session.commit();
+         for (int i = 0; i < count - 1; i++) {
+            Message received = consumer.receive(20000);
+            Assert.assertNotNull("Failed to get message: " + count, received);
+         }
+         session.commit();
+      }
+      finally {
+         connection.close();
       }
-      session.commit();
-      connection.close();
 
       Assert.assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
    }
@@ -520,6 +530,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
            }
    )
    public void testFailoverConsumerAckLost() throws Exception {
+      LOG.info(this + " running test testFailoverConsumerAckLost");
       // as failure depends on hash order of state tracker recovery, do a few times
       for (int i = 0; i < 3; i++) {
          try {
@@ -621,7 +632,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
          }.start();
 
          // will be stopped by the plugin
-         brokerStopLatch.await();
+         brokerStopLatch.await(60, TimeUnit.SECONDS);
          broker = createBroker();
          broker.start();
          doByteman.set(false);
@@ -657,7 +668,9 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
       }
 
       // ensure no dangling messages with fresh broker etc
-      broker.stop();
+      if (broker != null) {
+         broker.stop();
+      }
 
       LOG.info("Checking for remaining/hung messages..");
       broker = createBroker();
@@ -667,17 +680,20 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
       cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
       configureConnectionFactory(cf);
       connection = cf.createConnection();
-      connection.start();
-      Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer sweeper = sweeperSession.createConsumer(destination);
-      msg = sweeper.receive(1000);
-      if (msg == null) {
-         msg = sweeper.receive(5000);
+      try {
+         connection.start();
+         Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer sweeper = sweeperSession.createConsumer(destination);
+         msg = sweeper.receive(1000);
+         if (msg == null) {
+            msg = sweeper.receive(5000);
+         }
+         LOG.info("Sweep received: " + msg);
+         Assert.assertNull("no messges left dangling but got: " + msg, msg);
+      }
+      finally {
+         connection.close();
       }
-      LOG.info("Sweep received: " + msg);
-      Assert.assertNull("no messges left dangling but got: " + msg, msg);
-      connection.close();
-
       broker.stop();
    }
 
@@ -693,6 +709,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
            }
    )
    public void testPoolingNConsumesAfterReconnect() throws Exception {
+      LOG.info(this + " running test testPoolingNConsumesAfterReconnect");
       broker = createBroker();
       startBrokerWithDurableQueue();
 
@@ -785,8 +802,10 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
          LOG.info("calling close to trigger broker stop " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
          consumer.close();
 
+         LOG.info("waiting latch: " + brokerStopLatch.getCount());
          // will be stopped by the plugin
-         brokerStopLatch.await();
+         Assert.assertTrue(brokerStopLatch.await(60, TimeUnit.SECONDS));
+
          doByteman.set(false);
          broker = createBroker();
          broker.start();
@@ -819,45 +838,50 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
 
    @Test
    public void testAutoRollbackWithMissingRedeliveries() throws Exception {
+      LOG.info(this + " running test testAutoRollbackWithMissingRedeliveries");
       broker = createBroker();
       broker.start();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
       configureConnectionFactory(cf);
       Connection connection = cf.createConnection();
-      connection.start();
-      final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
-      final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
-      MessageConsumer consumer = consumerSession.createConsumer(destination);
+      try {
+         connection.start();
+         final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
+         final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+         MessageConsumer consumer = consumerSession.createConsumer(destination);
 
-      produceMessage(producerSession, destination);
+         produceMessage(producerSession, destination);
 
-      Message msg = consumer.receive(20000);
-      Assert.assertNotNull(msg);
+         Message msg = consumer.receive(20000);
+         Assert.assertNotNull(msg);
 
-      broker.stop();
-      broker = createBroker();
-      // use empty jdbc store so that default wait(0) for redeliveries will timeout after failover
-      broker.start();
+         broker.stop();
+         broker = createBroker();
+         // use empty jdbc store so that default wait(0) for redeliveries will timeout after failover
+         broker.start();
 
-      try {
-         consumerSession.commit();
-         Assert.fail("expected transaciton rolledback ex");
+         try {
+            consumerSession.commit();
+            Assert.fail("expected transaciton rolledback ex");
+         }
+         catch (TransactionRolledBackException expected) {
+         }
+
+         broker.stop();
+         broker = createBroker();
+         broker.start();
+         Assert.assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
       }
-      catch (TransactionRolledBackException expected) {
+      finally {
+         connection.close();
       }
-
-      broker.stop();
-      broker = createBroker();
-      broker.start();
-
-      Assert.assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
-      connection.close();
    }
 
    @Test
    public void testWaitForMissingRedeliveries() throws Exception {
-      LOG.info("testWaitForMissingRedeliveries()");
+      LOG.info(this + " running test testWaitForMissingRedeliveries");
+
       broker = createBroker();
       broker.start();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
@@ -906,7 +930,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
 
    @Test
    public void testReDeliveryWhilePending() throws Exception {
-      LOG.info("testReDeliveryWhilePending()");
+      LOG.info(this + " running test testReDeliveryWhilePending");
       broker = createBroker();
       broker.start();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
@@ -989,6 +1013,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
                LOG.info("Stopping broker post commit...");
                try {
                   broker.stop();
+                  broker = null;
                }
                catch (Exception e) {
                   e.printStackTrace();
@@ -1028,6 +1053,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
                public void run() {
                   try {
                      broker.stop();
+                     broker = null;
                   }
                   catch (Exception e) {
                      e.printStackTrace();