You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/12/16 18:16:59 UTC

qpid-jms git commit: QPIDJMS-232 Perform an open / close on Connection close if not done yet

Repository: qpid-jms
Updated Branches:
  refs/heads/master 6b56819ea -> 8d211bc8e


QPIDJMS-232 Perform an open / close on Connection close if not done yet

For a connection that was authenticated but never used or lacking a
client ID on the URI we want to do a normal open / close on the AMQP
connection when Connection close is called to complete a normal
connection cycle with the remote before closing the transport down.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/8d211bc8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/8d211bc8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/8d211bc8

Branch: refs/heads/master
Commit: 8d211bc8ee8c7d97658a9647513b2ad9b92d2d5f
Parents: 6b56819
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Dec 16 13:16:42 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Dec 16 13:16:42 2016 -0500

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 33 ++++++++---
 .../builders/AmqpClosedConnectionBuilder.java   | 58 ++++++++++++++++++++
 .../amqp/builders/AmqpConnectionBuilder.java    |  7 ++-
 .../ConnectionFactoryIntegrationTest.java       | 55 +++++++++++++++++++
 .../jms/integration/SaslIntegrationTest.java    |  2 +-
 .../jms/provider/amqp/AmqpProviderTest.java     |  8 +++
 6 files changed, 152 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8d211bc8/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 88cd86c..63bca3a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -53,6 +53,7 @@ import org.apache.qpid.jms.provider.ProviderClosedException;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.amqp.builders.AmqpClosedConnectionBuilder;
 import org.apache.qpid.jms.provider.amqp.builders.AmqpConnectionBuilder;
 import org.apache.qpid.jms.transports.TransportFactory;
 import org.apache.qpid.jms.transports.TransportListener;
@@ -170,9 +171,9 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
             public void run() {
 
                 connectionRequest = connectRequest;
+                AmqpProvider.this.connectionInfo = connectionInfo;
 
-                try
-                {
+                try {
                     protonTransport.setEmitFlowEventOnSend(false);
 
                     if (getMaxFrameSize() > 0) {
@@ -209,8 +210,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                     } else {
                         connectRequest.onSuccess();
                     }
-                }
-                catch (Throwable t) {
+                } catch (Throwable t) {
                     connectionRequest.onFailure(IOExceptionSupport.create(t));
                 }
             }
@@ -254,14 +254,32 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                         // just signal success.
                         if (transport == null || !transport.isConnected()) {
                             request.onSuccess();
+                            return;
                         }
 
                         if (connection != null) {
                             connection.close(request);
-                            pumpToProtonTransport(request);
                         } else {
-                            request.onSuccess();
+                            // If the SASL authentication occurred but failed then we don't
+                            // need to do an open / close
+                            if (authenticator != null && !authenticator.wasSuccessful()) {
+                                request.onSuccess();
+                                return;
+                            }
+
+                            // Connection attempt might have been tried and failed so only perform
+                            // an open / close cycle if one hasn't been done already.
+                            if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED) {
+                                AmqpClosedConnectionBuilder builder = new AmqpClosedConnectionBuilder(getProvider(), connectionInfo);
+                                builder.buildResource(request);
+
+                                protonConnection.setContext(builder);
+                            } else {
+                                request.onSuccess();
+                            }
                         }
+
+                        pumpToProtonTransport(request);
                     } catch (Exception e) {
                         LOG.debug("Caught exception while closing proton connection: {}", e.getMessage());
                     } finally {
@@ -912,8 +930,9 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                     // Close the transport to avoid emitting any additional frames.
                     org.apache.qpid.proton.engine.Transport t = protonConnection.getTransport();
                     t.close_head();
+                } else {
+                    authenticator = null;
                 }
-                authenticator = null;
             }
         } catch (Throwable ex) {
             try {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8d211bc8/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpClosedConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpClosedConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpClosedConnectionBuilder.java
new file mode 100644
index 0000000..ba940df
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpClosedConnectionBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.builders;
+
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpProvider;
+
+/**
+ * Specialized Builder that create a Connection that is intended to be immediately
+ * closed.
+ */
+public class AmqpClosedConnectionBuilder extends AmqpConnectionBuilder {
+
+    public AmqpClosedConnectionBuilder(AmqpProvider parent, JmsConnectionInfo resourceInfo) {
+        super(parent, resourceInfo);
+    }
+
+    @Override
+    protected AsyncResult createRequestIntercepter(final AsyncResult request) {
+        return request;
+    }
+
+    @Override
+    protected void afterOpened() {
+        getEndpoint().close();
+    }
+
+    @Override
+    protected void afterClosed(AmqpConnection resource, JmsConnectionInfo resourceInfo) {
+        // If the resource closed and no error was given, we just closed it now to avoid
+        // failing the request with a default error which creates log spam.
+        if (!hasRemoteError()) {
+            request.onSuccess();
+        }
+    }
+
+    @Override
+    protected boolean isClosePending() {
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8d211bc8/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
index 71a4dd4..5792c09 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
@@ -48,8 +48,11 @@ public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, A
 
     @Override
     public void buildResource(final AsyncResult request) {
+        super.buildResource(createRequestIntercepter(request));
+    }
 
-        AsyncResult connectionRequest = new AsyncResult() {
+    protected AsyncResult createRequestIntercepter(final AsyncResult request) {
+        return new AsyncResult() {
 
             @Override
             public void onSuccess() {
@@ -90,8 +93,6 @@ public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, A
                 return getResource().isOpen();
             }
         };
-
-        super.buildResource(connectionRequest);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8d211bc8/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
index dc11976..3897186 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
@@ -72,7 +72,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
 
             testPeer.waitForAllHandlersToComplete(1000);
 
+            testPeer.expectOpen();
+            testPeer.expectClose();
+
             connection.close();
+
+            testPeer.waitForAllHandlersToCompleteNoAssert(1000);
         }
     }
 
@@ -91,7 +96,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
 
             testPeer.waitForAllHandlersToComplete(1000);
 
+            testPeer.expectOpen();
+            testPeer.expectClose();
+
             connection.close();
+
+            testPeer.waitForAllHandlersToCompleteNoAssert(1000);
         }
     }
 
@@ -110,7 +120,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
 
             testPeer.waitForAllHandlersToComplete(1000);
 
+            testPeer.expectOpen();
+            testPeer.expectClose();
+
             connection.close();
+
+            testPeer.waitForAllHandlersToCompleteNoAssert(1000);
         }
     }
 
@@ -129,7 +144,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
 
             testPeer.waitForAllHandlersToComplete(1000);
 
+            testPeer.expectOpen();
+            testPeer.expectClose();
+
             connection.close();
+
+            testPeer.waitForAllHandlersToCompleteNoAssert(1000);
         }
     }
 
@@ -154,7 +174,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
 
             testPeer.waitForAllHandlersToComplete(1000);
 
+            testPeer.expectOpen();
+            testPeer.expectClose();
+
             connection.close();
+
+            testPeer.waitForAllHandlersToCompleteNoAssert(1000);
         }
     }
 
@@ -232,7 +257,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
 
                 testPeer.waitForAllHandlersToComplete(1000);
 
+                testPeer.expectOpen();
+                testPeer.expectClose();
+
                 connection.close();
+
+                testPeer.waitForAllHandlersToCompleteNoAssert(1000);
             }
         }
     }
@@ -259,7 +289,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
 
             testPeer.waitForAllHandlersToComplete(1000);
 
+            testPeer.expectOpen();
+            testPeer.expectClose();
+
             connection.close();
+
+            testPeer.waitForAllHandlersToCompleteNoAssert(1000);
         }
     }
 
@@ -285,7 +320,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
 
             testPeer.waitForAllHandlersToComplete(1000);
 
+            testPeer.expectOpen();
+            testPeer.expectClose();
+
             connection.close();
+
+            testPeer.waitForAllHandlersToCompleteNoAssert(1000);
         }
     }
 
@@ -311,7 +351,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
 
             testPeer.waitForAllHandlersToCompleteNoAssert(1000);
 
+            testPeer.expectOpen();
+            testPeer.expectClose();
+
             connection.close();
+
+            testPeer.waitForAllHandlersToCompleteNoAssert(1000);
         }
     }
 
@@ -337,7 +382,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
 
             testPeer.waitForAllHandlersToComplete(1000);
 
+            testPeer.expectOpen();
+            testPeer.expectClose();
+
             connection.close();
+
+            testPeer.waitForAllHandlersToCompleteNoAssert(1000);
         }
     }
 
@@ -363,7 +413,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
 
             testPeer.waitForAllHandlersToComplete(1000);
 
+            testPeer.expectOpen();
+            testPeer.expectClose();
+
             connection.close();
+
+            testPeer.waitForAllHandlersToCompleteNoAssert(1000);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8d211bc8/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
index 188e6d8..65e378d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
@@ -52,7 +52,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase {
     private static final String CLIENT_JKS_TRUSTSTORE = "src/test/resources/client-jks.truststore";
     private static final String PASSWORD = "password";
 
-    @Test //(timeout = 20000)  // TODO
+    @Test(timeout = 20000)
     public void testSaslExternalConnection() throws Exception {
         TransportSslOptions sslOptions = new TransportSslOptions();
         sslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8d211bc8/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
index 458db54..7ce6acd 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
@@ -125,6 +125,9 @@ public class AmqpProviderTest extends QpidJmsTestCase {
             fail("Should have thrown an error, no listener registered.");
         } catch (Exception ex) {
         }
+
+        testPeer.expectOpen();
+        testPeer.expectClose();
     }
 
     @Test(timeout=20000)
@@ -135,11 +138,16 @@ public class AmqpProviderTest extends QpidJmsTestCase {
         provider.connect(connectionInfo);
         assertTrue(provider.toString().contains("localhost"));
         assertTrue(provider.toString().contains(String.valueOf(peerURI.getPort())));
+
+        testPeer.expectOpen();
+        testPeer.expectClose();
     }
 
     @Test(timeout=20000)
     public void testClosedProviderThrowsIOException() throws IOException {
         testPeer.expectSaslAnonymous();
+        testPeer.expectOpen();
+        testPeer.expectClose();
 
         provider = new AmqpProvider(peerURI);
         provider.connect(connectionInfo);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org