You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/08/31 16:56:08 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5951 -
ensure failover oneway won't retry if reconnect will not happen
Repository: activemq
Updated Branches:
refs/heads/master 1ea289736 -> ae9af4b8b
https://issues.apache.org/jira/browse/AMQ-5951 - ensure failover oneway won't retry if reconnect will not happen
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ae9af4b8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ae9af4b8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ae9af4b8
Branch: refs/heads/master
Commit: ae9af4b8b29e792db213ca2cc2879ddc7c4118e5
Parents: 1ea2897
Author: gtully <ga...@gmail.com>
Authored: Mon Aug 31 15:55:44 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Mon Aug 31 15:55:44 2015 +0100
----------------------------------------------------------------------
.../transport/failover/FailoverTransport.java | 2 +-
.../transport/failover/FailoverTimeoutTest.java | 65 +++++++++++++++++++-
2 files changed, 64 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/ae9af4b8/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index 0bccac0..4e196b3 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -674,7 +674,7 @@ public class FailoverTransport implements CompositeTransport {
// If the command was not tracked.. we will retry in
// this method
- if (tracked == null) {
+ if (tracked == null && canReconnect()) {
// since we will retry in this method.. take it
// out of the request
http://git-wip-us.apache.org/repos/asf/activemq/blob/ae9af4b8/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
index 865d7c9..35a970f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
@@ -20,9 +20,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.net.Socket;
import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -31,6 +37,7 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.MessageAck;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -49,7 +56,7 @@ public class FailoverTimeoutTest {
public void setUp() throws Exception {
bs = new BrokerService();
bs.setUseJmx(false);
- bs.addConnector("tcp://localhost:0");
+ bs.addConnector(getTransportUri());
bs.start();
tcpUri = bs.getTransportConnectors().get(0).getConnectUri();
}
@@ -119,8 +126,62 @@ public class FailoverTimeoutTest {
bs.waitUntilStarted();
producer.send(message);
-
bs.stop();
+ connection.close();
+ }
+
+ @Test
+ public void testInterleaveSendAndException() throws Exception {
+
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?maxReconnectAttempts=0");
+ final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+ connection.start();
+
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ try {
+ LOG.info("Deal with exception - invoke op that may block pending outstanding oneway");
+ // try and invoke on connection as part of handling exception
+ connection.asyncSendPacket(new MessageAck());
+ } catch (Exception e) {
+ }
+ }
+ });
+
+ final ExecutorService executorService = Executors.newCachedThreadPool();
+
+ final int NUM_TASKS = 200;
+ final CountDownLatch enqueueOnExecutorDone = new CountDownLatch(NUM_TASKS);
+
+ for (int i=0; i < NUM_TASKS; i++) {
+
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ connection.asyncSendPacket(new MessageAck());
+ } catch (JMSException e) {
+ e.printStackTrace();
+ } finally {
+ enqueueOnExecutorDone.countDown();
+ }
+
+ }
+ });
+ }
+
+ while (enqueueOnExecutorDone.getCount() > (NUM_TASKS - 20)) {
+ enqueueOnExecutorDone.await(20, TimeUnit.MILLISECONDS);
+ }
+
+ // force IOException
+ final Socket socket = connection.getTransport().narrow(Socket.class);
+ socket.close();
+
+ executorService.shutdown();
+
+ assertTrue("all ops finish", enqueueOnExecutorDone.await(15, TimeUnit.SECONDS));
}
@Test