You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/08/24 23:21:10 UTC

qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-98

Repository: qpid-jms
Updated Branches:
  refs/heads/master 5421bf334 -> 2e89d0fb4


https://issues.apache.org/jira/browse/QPIDJMS-98

Improvements to the transports to ensure that if the connect is going to
fail that we don't also fire off an async error ahead of it as that can
lead to contention or deadlock during failover when we try and detect
and cleanup failed connect attempts.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2e89d0fb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2e89d0fb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2e89d0fb

Branch: refs/heads/master
Commit: 2e89d0fb440203569fa352210589258124de42d6
Parents: 5421bf3
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Aug 24 17:20:35 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Aug 24 17:20:35 2015 -0400

----------------------------------------------------------------------
 .../jms/transports/netty/NettyTcpTransport.java | 31 ++++++++++++++---
 .../transports/netty/NettySslTransportTest.java | 35 +++++++++-----------
 .../transports/netty/NettyTcpTransportTest.java | 22 ++++++------
 3 files changed, 52 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2e89d0fb/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
index 8035161..7b02238 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -64,6 +64,7 @@ public class NettyTcpTransport implements Transport {
     private final AtomicBoolean closed = new AtomicBoolean();
     private final CountDownLatch connectLatch = new CountDownLatch(1);
     private IOException failureCause;
+    private Throwable pendingFailure;
 
     /**
      * Create a new transport instance
@@ -150,6 +151,17 @@ public class NettyTcpTransport implements Transport {
             }
 
             throw failureCause;
+        } else {
+            // Connected, allow any held async error to fire now and close the transport.
+            channel.eventLoop().execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    if (pendingFailure != null) {
+                        channel.pipeline().fireExceptionCaught(pendingFailure);
+                    }
+                }
+            });
         }
     }
 
@@ -292,8 +304,7 @@ public class NettyTcpTransport implements Transport {
         @Override
         public void channelInactive(ChannelHandlerContext context) throws Exception {
             LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
-            if (!closed.get()) {
-                connected.set(false);
+            if (connected.compareAndSet(true, false) && !closed.get()) {
                 LOG.trace("Firing onTransportClosed listener");
                 listener.onTransportClosed();
             }
@@ -302,10 +313,20 @@ public class NettyTcpTransport implements Transport {
         @Override
         public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
             LOG.trace("Exception on channel! Channel is {}", context.channel());
-            if (!closed.get()) {
-                connected.set(false);
+            if (connected.compareAndSet(true, false) && !closed.get()) {
                 LOG.trace("Firing onTransportError listener");
-                listener.onTransportError(cause);
+                if (pendingFailure != null) {
+                    listener.onTransportError(pendingFailure);
+                } else {
+                    listener.onTransportError(cause);
+                }
+            } else {
+                // Hold the first failure for later dispatch if connect succeeds.
+                // This will then trigger disconnect using the first error reported.
+                if (pendingFailure != null) {
+                    LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
+                    pendingFailure = cause;
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2e89d0fb/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
index 92537ef..adadd38 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
@@ -28,7 +28,6 @@ import java.net.URISyntaxException;
 import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
 
-import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.transports.Transport;
 import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.transports.TransportOptions;
@@ -80,9 +79,9 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
             Transport transport = createTransport(serverLocation, testListener, createClientOptionsWithoutTrustStore(false));
             try {
                 transport.connect();
-                fail("Should not have connected to the server");
+                fail("Should not have connected to the server: " + serverLocation);
             } catch (Exception e) {
-                LOG.info("Connection failed to untrusted test server.");
+                LOG.info("Connection failed to untrusted test server: {}", serverLocation);
             }
 
             assertFalse(transport.isConnected());
@@ -92,11 +91,7 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
 
         logTransportErrors();
 
-        //TODO: identify if why we also get exception
-        //via listener and whether it can be suppressed
-        if(!QpidJmsTestCase.IS_WINDOWS) {
-            assertTrue(exceptions.isEmpty());
-        }
+        assertTrue(exceptions.isEmpty());
     }
 
     @Test(timeout = 60 * 1000)
@@ -115,9 +110,9 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
             Transport transport = createTransport(serverLocation, testListener, options);
             try {
                 transport.connect();
-                fail("Should not have connected to the server");
+                fail("Should not have connected to the server: " + serverLocation);
             } catch (Exception e) {
-                LOG.info("Connection failed to untrusted test server.");
+                LOG.info("Connection failed to untrusted test server: {}", serverLocation);
             }
 
             assertFalse(transport.isConnected());
@@ -137,9 +132,9 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
             Transport transport = createTransport(serverLocation, testListener, createClientOptionsWithoutTrustStore(true));
             try {
                 transport.connect();
-                LOG.info("Connection established to untrusted test server.");
+                LOG.info("Connection established to untrusted test server: {}", serverLocation);
             } catch (Exception e) {
-                fail("Should have connected to the server");
+                fail("Should have connected to the server: " + serverLocation);
             }
 
             assertTrue(transport.isConnected());
@@ -166,9 +161,9 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
             NettySslTransport transport = createTransport(serverLocation, testListener, clientOptions);
             try {
                 transport.connect();
-                LOG.info("Connection established to test server.");
+                LOG.info("Connection established to test server: {}", serverLocation);
             } catch (Exception e) {
-                fail("Should have connected to the server");
+                fail("Should have connected to the server: " + serverLocation);
             }
 
             assertTrue(transport.isConnected());
@@ -205,9 +200,9 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
             NettySslTransport transport = createTransport(serverLocation, testListener, clientOptions);
             try {
                 transport.connect();
-                LOG.info("Connection established to test server.");
+                LOG.info("Connection established to test server: {}", serverLocation);
             } catch (Exception e) {
-                fail("Should have connected to the server");
+                fail("Should have connected to the server: " + serverLocation);
             }
 
             assertTrue(transport.isConnected());
@@ -259,14 +254,14 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
             try {
                 transport.connect();
                 if (verifyHost) {
-                    fail("Should not have connected to the server");
+                    fail("Should not have connected to the server: " + serverLocation);
                 }
             } catch (Exception e) {
                 if (verifyHost) {
-                    LOG.info("Connection failed to test server as expected.");
+                    LOG.info("Connection failed to test server: {} as expected.", serverLocation);
                 } else {
-                    LOG.error("Failed to connect to test server", e);
-                    fail("Should have connected to the server, but got: " + e);
+                    LOG.error("Failed to connect to test server: " + serverLocation, e);
+                    fail("Should have connected to the server: " + serverLocation + ", but got: " + e);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2e89d0fb/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
index e99acd5..7b3f73b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
@@ -91,7 +91,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
             Transport transport = createTransport(serverLocation, testListener, createClientOptions());
             try {
                 transport.connect();
-                fail("Should have failed to connect to the server");
+                fail("Should have failed to connect to the server: " + serverLocation);
             } catch (Exception e) {
                 LOG.info("Failed to connect to: {} as expected.", serverLocation);
             }
@@ -117,7 +117,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
             Transport transport = createTransport(serverLocation, null, createClientOptions());
             try {
                 transport.connect();
-                fail("Should have failed to connect to the server");
+                fail("Should have failed to connect to the server: " + serverLocation);
             } catch (Exception e) {
                 LOG.info("Failed to connect to: {} as expected.", serverLocation);
             }
@@ -145,7 +145,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
                 transport.connect();
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
-                fail("Should not have failed to connect to the server");
+                fail("Should not have failed to connect to the server: " + serverLocation);
             }
 
             assertTrue(transport.isConnected());
@@ -167,7 +167,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
                 transport.connect();
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
-                fail("Should have connected to the server");
+                fail("Should have connected to the server: " + serverLocation);
             }
 
             assertTrue(transport.isConnected());
@@ -204,7 +204,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
                     LOG.info("Connected to server:{} as expected.", serverLocation);
                     transports.add(transport);
                 } catch (Exception e) {
-                    fail("Should have connected to the server");
+                    fail("Should have connected to the server: " + serverLocation);
                 }
             }
 
@@ -243,7 +243,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
                     transport.send(sendBuffer.copy());
                     transports.add(transport);
                 } catch (Exception e) {
-                    fail("Should have connected to the server");
+                    fail("Should have connected to the server: " + serverLocation);
                 }
             }
 
@@ -279,7 +279,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
                 transport.connect();
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
-                fail("Should have connected to the server");
+                fail("Should have connected to the server: " + serverLocation);
             }
 
             assertTrue(transport.isConnected());
@@ -317,7 +317,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
                 transport.connect();
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
-                fail("Should have connected to the server");
+                fail("Should have connected to the server: " + serverLocation);
             }
 
             assertTrue(transport.isConnected());
@@ -345,7 +345,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
                 transport.connect();
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
-                fail("Should have connected to the server");
+                fail("Should have connected to the server: " + serverLocation);
             }
 
             assertTrue(transport.isConnected());
@@ -397,7 +397,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
                 transport.connect();
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
-                fail("Should have connected to the server");
+                fail("Should have connected to the server: " + serverLocation);
             }
 
             assertTrue(transport.isConnected());
@@ -441,7 +441,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
                 transport.connect();
                 LOG.info("Connected to server:{} as expected.", serverLocation);
             } catch (Exception e) {
-                fail("Should have connected to the server");
+                fail("Should have connected to the server: " + serverLocation);
             }
 
             assertTrue(transport.isConnected());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org