You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/21 23:56:23 UTC
[67/68] [abbrv] activemq-artemis git commit: more test fixes added
FailoverTransactionTest back
more test fixes
added FailoverTransactionTest back
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3260d5d1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3260d5d1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3260d5d1
Branch: refs/heads/refactor-openwire
Commit: 3260d5d15da80cf3563b5945a686afd6016dbba9
Parents: 0182b81
Author: Howard Gao <ho...@gmail.com>
Authored: Mon Mar 21 20:56:09 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Mar 21 18:54:50 2016 -0400
----------------------------------------------------------------------
.../core/protocol/openwire/amq/AMQConsumer.java | 14 +-
tests/activemq5-unit-tests/pom.xml | 3 -
.../transport/SoWriteTimeoutClientTest.java | 57 +++---
.../failover/FailoverTransactionTest.java | 184 +++++++++++--------
4 files changed, 146 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3260d5d1/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index b4056fb..ef9b2a8 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -46,7 +46,6 @@ import org.apache.activemq.command.TransactionId;
import org.apache.activemq.wireformat.WireFormat;
public class AMQConsumer {
-
private AMQSession session;
private org.apache.activemq.command.ActiveMQDestination openwireDestination;
private ConsumerInfo info;
@@ -238,9 +237,16 @@ public class AMQConsumer {
mi = iter.next();
if (mi.amqId.equals(lastm)) {
n++;
- iter.remove();
- session.getCoreSession().individualAcknowledge(nativeId, mi.nativeId);
- session.getCoreSession().commit();
+ if (!isLocalTx) {
+ iter.remove();
+ session.getCoreSession().individualAcknowledge(nativeId, mi.nativeId);
+ }
+ else {
+ mi.setLocalAcked(true);
+ }
+ if (tid == null) {
+ session.getCoreSession().commit();
+ }
break;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3260d5d1/tests/activemq5-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/pom.xml b/tests/activemq5-unit-tests/pom.xml
index bbfbcd3..9eee81c 100644
--- a/tests/activemq5-unit-tests/pom.xml
+++ b/tests/activemq5-unit-tests/pom.xml
@@ -436,9 +436,6 @@
<include>**/org/apache/activemq/blob/BlobTransferPolicyUriTest.java</include>
</includes>
<excludes>
- <!-- this test is know to pass (run in standalone) but it seems hang when running in testsuite -->
- <exclude>**/org/apache/activemq/transport/failover/FailoverTransactionTest.java</exclude>
-
<!-- exclude tests that can cause hang -->
<exclude>**/org/apache/activemq/PerDestinationStoreLimitTest.java</exclude>
<exclude>**/org/apache/activemq/ProducerFlowControlTest.java</exclude>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3260d5d1/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
index 3506ff0..0998599 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
@@ -74,36 +74,41 @@ public class SoWriteTimeoutClientTest extends OpenwireArtemisBaseTest {
MessageConsumer consumer = session.createConsumer(dest);
SocketProxy proxy = new SocketProxy();
- proxy.setTarget(tcpBrokerUri);
- proxy.open();
-
- ActiveMQConnectionFactory pFactory = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl() + "?soWriteTimeout=4000&sleep=500)?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=6638400");
- final Connection pc = pFactory.createConnection();
- pc.start();
- proxy.pause();
-
- final int messageCount = 20;
- ExecutorService executorService = Executors.newCachedThreadPool();
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- sendMessages(pc, dest, messageCount);
- }
- catch (Exception ignored) {
- ignored.printStackTrace();
+ try {
+ proxy.setTarget(tcpBrokerUri);
+ proxy.open();
+
+ ActiveMQConnectionFactory pFactory = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl() + "?soWriteTimeout=4000&sleep=500)?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=6638400");
+ final Connection pc = pFactory.createConnection();
+ pc.start();
+ proxy.pause();
+
+ final int messageCount = 20;
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ sendMessages(pc, dest, messageCount);
+ }
+ catch (Exception ignored) {
+ ignored.printStackTrace();
+ }
}
+ });
+
+ // wait for timeout and reconnect
+ TimeUnit.SECONDS.sleep(8);
+ proxy.goOn();
+ for (int i = 0; i < messageCount; i++) {
+ Assert.assertNotNull("Got message " + i + " after reconnect", consumer.receive(10000));
}
- });
- // wait for timeout and reconnect
- TimeUnit.SECONDS.sleep(8);
- proxy.goOn();
- for (int i = 0; i < messageCount; i++) {
- Assert.assertNotNull("Got message " + i + " after reconnect", consumer.receive(10000));
+ Assert.assertNull(consumer.receive(5000));
+ }
+ finally {
+ proxy.close();
}
-
- Assert.assertNull(consumer.receive(5000));
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3260d5d1/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index 4aaec57..54fedf1 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -77,7 +77,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
private static boolean firstSend;
private static int count;
- private static EmbeddedJMS broker;
+ private static volatile EmbeddedJMS broker;
@Before
public void setUp() throws Exception {
@@ -111,6 +111,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
@Test
public void testFailoverProducerCloseBeforeTransaction() throws Exception {
+ LOG.info(this + " running test testFailoverProducerCloseBeforeTransaction");
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
@@ -144,6 +145,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
}
)
public void testFailoverCommitReplyLost() throws Exception {
+ LOG.info(this + " running test testFailoverCommitReplyLost");
broker = createBroker();
startBrokerWithDurableQueue();
@@ -177,7 +179,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
}.start();
// will be stopped by the plugin
- brokerStopLatch.await();
+ brokerStopLatch.await(60, TimeUnit.SECONDS);
doByteman.set(false);
broker = createBroker();
broker.start();
@@ -240,7 +242,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
}
)
public void testFailoverSendReplyLost() throws Exception {
-
+ LOG.info(this + " running test testFailoverSendReplyLost");
broker = createBroker();
startBrokerWithDurableQueue();
doByteman.set(true);
@@ -272,7 +274,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
}.start();
// will be stopped by the plugin
- brokerStopLatch.await();
+ brokerStopLatch.await(60, TimeUnit.SECONDS);
doByteman.set(false);
broker = createBroker();
LOG.info("restarting....");
@@ -324,7 +326,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
}
)
public void testFailoverConnectionSendReplyLost() throws Exception {
-
+ LOG.info(this + " running test testFailoverConnectionSendReplyLost");
broker = createBroker();
proxy = new SocketProxy();
firstSend = true;
@@ -398,6 +400,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
@Test
public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
+ LOG.info(this + " running test testFailoverProducerCloseBeforeTransactionFailWhenDisabled");
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
configureConnectionFactory(cf);
@@ -423,6 +426,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
@Test
public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
+ LOG.info(this + " running test testFailoverMultipleProducerCloseBeforeTransaction");
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
@@ -457,53 +461,59 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
// https://issues.apache.org/activemq/browse/AMQ-2772
@Test
public void testFailoverWithConnectionConsumer() throws Exception {
+ LOG.info(this + " running test testFailoverWithConnectionConsumer");
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
-
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Queue destination = session.createQueue(QUEUE_NAME);
-
final CountDownLatch connectionConsumerGotOne = new CountDownLatch(1);
- final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- connection.createConnectionConsumer(destination, null, new ServerSessionPool() {
- public ServerSession getServerSession() throws JMSException {
- return new ServerSession() {
- public Session getSession() throws JMSException {
- return poolSession;
- }
- public void start() throws JMSException {
- connectionConsumerGotOne.countDown();
- poolSession.run();
- }
- };
- }
- }, 1);
+ try {
+ Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(QUEUE_NAME);
+
+ final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection.createConnectionConsumer(destination, null, new ServerSessionPool() {
+ public ServerSession getServerSession() throws JMSException {
+ return new ServerSession() {
+ public Session getSession() throws JMSException {
+ return poolSession;
+ }
- MessageConsumer consumer = session.createConsumer(destination);
- MessageProducer producer;
- TextMessage message;
- final int count = 10;
- for (int i = 0; i < count; i++) {
- producer = session.createProducer(destination);
- message = session.createTextMessage("Test message: " + count);
- producer.send(message);
- producer.close();
- }
+ public void start() throws JMSException {
+ connectionConsumerGotOne.countDown();
+ poolSession.run();
+ }
+ };
+ }
+ }, 1);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer;
+ TextMessage message;
+ final int count = 10;
+ for (int i = 0; i < count; i++) {
+ producer = session.createProducer(destination);
+ message = session.createTextMessage("Test message: " + count);
+ producer.send(message);
+ producer.close();
+ }
- // restart to force failover and connection state recovery before the commit
- broker.stop();
- startBroker();
+ // restart to force failover and connection state recovery before the commit
+ broker.stop();
+ startBroker();
- session.commit();
- for (int i = 0; i < count - 1; i++) {
- Assert.assertNotNull("Failed to get message: " + count, consumer.receive(20000));
+ session.commit();
+ for (int i = 0; i < count - 1; i++) {
+ Message received = consumer.receive(20000);
+ Assert.assertNotNull("Failed to get message: " + count, received);
+ }
+ session.commit();
+ }
+ finally {
+ connection.close();
}
- session.commit();
- connection.close();
Assert.assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
}
@@ -520,6 +530,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
}
)
public void testFailoverConsumerAckLost() throws Exception {
+ LOG.info(this + " running test testFailoverConsumerAckLost");
// as failure depends on hash order of state tracker recovery, do a few times
for (int i = 0; i < 3; i++) {
try {
@@ -621,7 +632,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
}.start();
// will be stopped by the plugin
- brokerStopLatch.await();
+ brokerStopLatch.await(60, TimeUnit.SECONDS);
broker = createBroker();
broker.start();
doByteman.set(false);
@@ -657,7 +668,9 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
}
// ensure no dangling messages with fresh broker etc
- broker.stop();
+ if (broker != null) {
+ broker.stop();
+ }
LOG.info("Checking for remaining/hung messages..");
broker = createBroker();
@@ -667,17 +680,20 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
connection = cf.createConnection();
- connection.start();
- Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer sweeper = sweeperSession.createConsumer(destination);
- msg = sweeper.receive(1000);
- if (msg == null) {
- msg = sweeper.receive(5000);
+ try {
+ connection.start();
+ Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer sweeper = sweeperSession.createConsumer(destination);
+ msg = sweeper.receive(1000);
+ if (msg == null) {
+ msg = sweeper.receive(5000);
+ }
+ LOG.info("Sweep received: " + msg);
+ Assert.assertNull("no messges left dangling but got: " + msg, msg);
+ }
+ finally {
+ connection.close();
}
- LOG.info("Sweep received: " + msg);
- Assert.assertNull("no messges left dangling but got: " + msg, msg);
- connection.close();
-
broker.stop();
}
@@ -693,6 +709,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
}
)
public void testPoolingNConsumesAfterReconnect() throws Exception {
+ LOG.info(this + " running test testPoolingNConsumesAfterReconnect");
broker = createBroker();
startBrokerWithDurableQueue();
@@ -785,8 +802,10 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
LOG.info("calling close to trigger broker stop " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
consumer.close();
+ LOG.info("waiting latch: " + brokerStopLatch.getCount());
// will be stopped by the plugin
- brokerStopLatch.await();
+ Assert.assertTrue(brokerStopLatch.await(60, TimeUnit.SECONDS));
+
doByteman.set(false);
broker = createBroker();
broker.start();
@@ -819,45 +838,50 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
@Test
public void testAutoRollbackWithMissingRedeliveries() throws Exception {
+ LOG.info(this + " running test testAutoRollbackWithMissingRedeliveries");
broker = createBroker();
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
- connection.start();
- final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
- final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = consumerSession.createConsumer(destination);
+ try {
+ connection.start();
+ final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
+ final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = consumerSession.createConsumer(destination);
- produceMessage(producerSession, destination);
+ produceMessage(producerSession, destination);
- Message msg = consumer.receive(20000);
- Assert.assertNotNull(msg);
+ Message msg = consumer.receive(20000);
+ Assert.assertNotNull(msg);
- broker.stop();
- broker = createBroker();
- // use empty jdbc store so that default wait(0) for redeliveries will timeout after failover
- broker.start();
+ broker.stop();
+ broker = createBroker();
+ // use empty jdbc store so that default wait(0) for redeliveries will timeout after failover
+ broker.start();
- try {
- consumerSession.commit();
- Assert.fail("expected transaciton rolledback ex");
+ try {
+ consumerSession.commit();
+ Assert.fail("expected transaciton rolledback ex");
+ }
+ catch (TransactionRolledBackException expected) {
+ }
+
+ broker.stop();
+ broker = createBroker();
+ broker.start();
+ Assert.assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
}
- catch (TransactionRolledBackException expected) {
+ finally {
+ connection.close();
}
-
- broker.stop();
- broker = createBroker();
- broker.start();
-
- Assert.assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
- connection.close();
}
@Test
public void testWaitForMissingRedeliveries() throws Exception {
- LOG.info("testWaitForMissingRedeliveries()");
+ LOG.info(this + " running test testWaitForMissingRedeliveries");
+
broker = createBroker();
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
@@ -906,7 +930,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
@Test
public void testReDeliveryWhilePending() throws Exception {
- LOG.info("testReDeliveryWhilePending()");
+ LOG.info(this + " running test testReDeliveryWhilePending");
broker = createBroker();
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
@@ -989,6 +1013,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
LOG.info("Stopping broker post commit...");
try {
broker.stop();
+ broker = null;
}
catch (Exception e) {
e.printStackTrace();
@@ -1028,6 +1053,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
public void run() {
try {
broker.stop();
+ broker = null;
}
catch (Exception e) {
e.printStackTrace();