You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/09/30 12:41:47 UTC
[1/2] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5854 - fix intermittent test
failure
Repository: activemq
Updated Branches:
refs/heads/master 94b56977d -> fc2553574
https://issues.apache.org/jira/browse/AMQ-5854 - fix intermittent test failure
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fc255357
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fc255357
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fc255357
Branch: refs/heads/master
Commit: fc25535748fb8dbaea588203086c4802d1ccf091
Parents: 8514e38
Author: gtully <ga...@gmail.com>
Authored: Wed Sep 30 11:40:30 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Sep 30 11:41:08 2015 +0100
----------------------------------------------------------------------
.../org/apache/activemq/TransactionContext.java | 2 +-
.../failover/FailoverTransactionTest.java | 36 ++++++++++++--------
2 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc255357/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
index 0925863..da05059 100755
--- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
+++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
@@ -313,7 +313,7 @@ public class TransactionContext implements XAResource {
throw e;
}
- if (rollbackOnly) {
+ if (transactionId != null && rollbackOnly) {
final String message = "Commit of " + transactionId + " failed due to rollback only request; typically due to failover with pending acks";
try {
rollback();
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc255357/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index 1f5ac7d..3d29ee0 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -1120,6 +1120,8 @@ public class FailoverTransactionTest extends TestSupport {
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=0");
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ final Session secondConsumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+
MessageConsumer consumer = consumerSession.createConsumer(destination);
produceMessage(producerSession, destination);
@@ -1130,22 +1132,25 @@ public class FailoverTransactionTest extends TestSupport {
assertNotNull("got message just produced", msg);
// add another consumer into the mix that may get the message after restart
- MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"));
+ MessageConsumer consumer2 = secondConsumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"));
broker.stop();
broker = createBroker(false, url);
broker.start();
final CountDownLatch commitDone = new CountDownLatch(1);
+ final CountDownLatch gotRollback = new CountDownLatch(1);
final Vector<Exception> exceptions = new Vector<Exception>();
- // commit may fail if other consumer gets the message on restart
+ // commit will fail due to failover with outstanding ack
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async commit...");
try {
consumerSession.commit();
+ } catch (TransactionRolledBackException ex) {
+ gotRollback.countDown();
} catch (JMSException ex) {
exceptions.add(ex);
} finally {
@@ -1156,23 +1161,24 @@ public class FailoverTransactionTest extends TestSupport {
assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
+ assertTrue("got Rollback", gotRollback.await(15, TimeUnit.SECONDS));
- // either message redelivered in existing tx or consumed by consumer2
- // should not be available again in any event
- assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000));
+ assertTrue("no other exceptions", exceptions.isEmpty());
// consumer replay is hashmap order dependent on a failover connection state recover so need to deal with both cases
- if (exceptions.isEmpty()) {
- LOG.info("commit succeeded, message was redelivered to the correct consumer after restart so commit was fine");
- assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000));
- } else {
- LOG.info("commit failed, consumer2 should get it", exceptions.get(0));
- assertNotNull("consumer2 got message", consumer2.receive(2000));
- consumerSession.commit();
- // no message should be in dlq
- MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
- assertNull("nothing in the dlq", dlqConsumer.receive(5000));
+ // consume message from one of the consumers
+ Message message = consumer2.receive(2000);
+ if (message == null) {
+ message = consumer.receive(2000);
}
+ consumerSession.commit();
+ secondConsumerSession.commit();
+
+ assertNotNull("got message after rollback", message);
+
+ // no message should be in dlq
+ MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
+ assertNull("nothing in the dlq", dlqConsumer.receive(2000));
connection.close();
}
[2/2] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5951 - scenario wheere transaction
command can block, additional test and further fix
Posted by gt...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5951 - scenario wheere transaction command can block, additional test and further fix
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8514e381
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8514e381
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8514e381
Branch: refs/heads/master
Commit: 8514e38135cf3c4da913806f3677a89785613e10
Parents: 94b5697
Author: gtully <ga...@gmail.com>
Authored: Tue Sep 29 16:34:36 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Sep 30 11:41:08 2015 +0100
----------------------------------------------------------------------
.../transport/failover/FailoverTransport.java | 8 +++-
.../transport/failover/FailoverTimeoutTest.java | 48 ++++++++++++++++----
2 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/8514e381/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index 4e196b3..0f36d67 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -607,7 +607,7 @@ public class FailoverTransport implements CompositeTransport {
long start = System.currentTimeMillis();
boolean timedout = false;
while (transport == null && !disposed && connectionFailure == null
- && !Thread.currentThread().isInterrupted()) {
+ && !Thread.currentThread().isInterrupted() && willReconnect()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Waiting for transport to reconnect..: " + command);
}
@@ -639,6 +639,8 @@ public class FailoverTransport implements CompositeTransport {
error = connectionFailure;
} else if (timedout == true) {
error = new IOException("Failover timeout of " + timeout + " ms reached.");
+ } else if (!willReconnect()) {
+ error = new IOException("Reconnect attempts of " + maxReconnectAttempts + " exceeded");
} else {
error = new IOException("Unexpected failure.");
}
@@ -723,6 +725,10 @@ public class FailoverTransport implements CompositeTransport {
}
}
+ private boolean willReconnect() {
+ return firstConnection || 0 != calculateReconnectAttemptLimit();
+ }
+
@Override
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
http://git-wip-us.apache.org/repos/asf/activemq/blob/8514e381/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
index 35a970f..7c36840 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
@@ -37,7 +38,10 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.TransactionInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -97,6 +101,14 @@ public class FailoverTimeoutTest {
LOG.info("Time spent waiting to connect: {} ms", duration);
assertTrue(duration > 3000);
+
+ safeClose(connection);
+ }
+
+ private void safeClose(Connection connection) {
+ try {
+ connection.close();
+ } catch (Exception ignored) {}
}
@Test
@@ -131,10 +143,29 @@ public class FailoverTimeoutTest {
}
@Test
- public void testInterleaveSendAndException() throws Exception {
+ public void testInterleaveAckAndException() throws Exception {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?maxReconnectAttempts=0");
+ final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+
+ doTestInterleaveAndException(connection, new MessageAck());
+ safeClose(connection);
+ }
+ @Test
+ public void testInterleaveTxAndException() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?maxReconnectAttempts=0");
final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+
+ TransactionInfo tx = new TransactionInfo();
+ tx.setConnectionId(connection.getConnectionInfo().getConnectionId());
+ tx.setTransactionId(new LocalTransactionId(tx.getConnectionId(), 1));
+ doTestInterleaveAndException(connection, tx);
+
+ safeClose(connection);
+ }
+
+ public void doTestInterleaveAndException(final ActiveMQConnection connection, final Command command) throws Exception {
+
connection.start();
connection.setExceptionListener(new ExceptionListener() {
@@ -143,7 +174,7 @@ public class FailoverTimeoutTest {
try {
LOG.info("Deal with exception - invoke op that may block pending outstanding oneway");
// try and invoke on connection as part of handling exception
- connection.asyncSendPacket(new MessageAck());
+ connection.asyncSendPacket(command);
} catch (Exception e) {
}
}
@@ -154,24 +185,24 @@ public class FailoverTimeoutTest {
final int NUM_TASKS = 200;
final CountDownLatch enqueueOnExecutorDone = new CountDownLatch(NUM_TASKS);
+ // let a few tasks delay a bit
+ final AtomicLong sleepMillis = new AtomicLong(1000);
for (int i=0; i < NUM_TASKS; i++) {
-
executorService.submit(new Runnable() {
@Override
public void run() {
try {
- connection.asyncSendPacket(new MessageAck());
- } catch (JMSException e) {
- e.printStackTrace();
+ TimeUnit.MILLISECONDS.sleep(Math.max(0, sleepMillis.addAndGet(-50)));
+ connection.asyncSendPacket(command);
+ } catch (Exception e) {
} finally {
enqueueOnExecutorDone.countDown();
}
-
}
});
}
- while (enqueueOnExecutorDone.getCount() > (NUM_TASKS - 20)) {
+ while (enqueueOnExecutorDone.getCount() > (NUM_TASKS - 10)) {
enqueueOnExecutorDone.await(20, TimeUnit.MILLISECONDS);
}
@@ -184,6 +215,7 @@ public class FailoverTimeoutTest {
assertTrue("all ops finish", enqueueOnExecutorDone.await(15, TimeUnit.SECONDS));
}
+
@Test
public void testUpdateUris() throws Exception {