You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/12/16 18:16:59 UTC
qpid-jms git commit: QPIDJMS-232 Perform an open / close on
Connection close if not done yet
Repository: qpid-jms
Updated Branches:
refs/heads/master 6b56819ea -> 8d211bc8e
QPIDJMS-232 Perform an open / close on Connection close if not done yet
For a connection that was authenticated but never used or lacking a
client ID on the URI we want to do a normal open / close on the AMQP
connection when Connection close is called to complete a normal
connection cycle with the remote before closing the transport down.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/8d211bc8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/8d211bc8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/8d211bc8
Branch: refs/heads/master
Commit: 8d211bc8ee8c7d97658a9647513b2ad9b92d2d5f
Parents: 6b56819
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Dec 16 13:16:42 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Dec 16 13:16:42 2016 -0500
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpProvider.java | 33 ++++++++---
.../builders/AmqpClosedConnectionBuilder.java | 58 ++++++++++++++++++++
.../amqp/builders/AmqpConnectionBuilder.java | 7 ++-
.../ConnectionFactoryIntegrationTest.java | 55 +++++++++++++++++++
.../jms/integration/SaslIntegrationTest.java | 2 +-
.../jms/provider/amqp/AmqpProviderTest.java | 8 +++
6 files changed, 152 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8d211bc8/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 88cd86c..63bca3a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -53,6 +53,7 @@ import org.apache.qpid.jms.provider.ProviderClosedException;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderFuture;
import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.amqp.builders.AmqpClosedConnectionBuilder;
import org.apache.qpid.jms.provider.amqp.builders.AmqpConnectionBuilder;
import org.apache.qpid.jms.transports.TransportFactory;
import org.apache.qpid.jms.transports.TransportListener;
@@ -170,9 +171,9 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
public void run() {
connectionRequest = connectRequest;
+ AmqpProvider.this.connectionInfo = connectionInfo;
- try
- {
+ try {
protonTransport.setEmitFlowEventOnSend(false);
if (getMaxFrameSize() > 0) {
@@ -209,8 +210,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
} else {
connectRequest.onSuccess();
}
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
connectionRequest.onFailure(IOExceptionSupport.create(t));
}
}
@@ -254,14 +254,32 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
// just signal success.
if (transport == null || !transport.isConnected()) {
request.onSuccess();
+ return;
}
if (connection != null) {
connection.close(request);
- pumpToProtonTransport(request);
} else {
- request.onSuccess();
+ // If the SASL authentication occurred but failed then we don't
+ // need to do an open / close
+ if (authenticator != null && !authenticator.wasSuccessful()) {
+ request.onSuccess();
+ return;
+ }
+
+ // Connection attempt might have been tried and failed so only perform
+ // an open / close cycle if one hasn't been done already.
+ if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED) {
+ AmqpClosedConnectionBuilder builder = new AmqpClosedConnectionBuilder(getProvider(), connectionInfo);
+ builder.buildResource(request);
+
+ protonConnection.setContext(builder);
+ } else {
+ request.onSuccess();
+ }
}
+
+ pumpToProtonTransport(request);
} catch (Exception e) {
LOG.debug("Caught exception while closing proton connection: {}", e.getMessage());
} finally {
@@ -912,8 +930,9 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
// Close the transport to avoid emitting any additional frames.
org.apache.qpid.proton.engine.Transport t = protonConnection.getTransport();
t.close_head();
+ } else {
+ authenticator = null;
}
- authenticator = null;
}
} catch (Throwable ex) {
try {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8d211bc8/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpClosedConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpClosedConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpClosedConnectionBuilder.java
new file mode 100644
index 0000000..ba940df
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpClosedConnectionBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.builders;
+
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpProvider;
+
+/**
+ * Specialized Builder that create a Connection that is intended to be immediately
+ * closed.
+ */
+public class AmqpClosedConnectionBuilder extends AmqpConnectionBuilder {
+
+ public AmqpClosedConnectionBuilder(AmqpProvider parent, JmsConnectionInfo resourceInfo) {
+ super(parent, resourceInfo);
+ }
+
+ @Override
+ protected AsyncResult createRequestIntercepter(final AsyncResult request) {
+ return request;
+ }
+
+ @Override
+ protected void afterOpened() {
+ getEndpoint().close();
+ }
+
+ @Override
+ protected void afterClosed(AmqpConnection resource, JmsConnectionInfo resourceInfo) {
+ // If the resource closed and no error was given, we just closed it now to avoid
+ // failing the request with a default error which creates log spam.
+ if (!hasRemoteError()) {
+ request.onSuccess();
+ }
+ }
+
+ @Override
+ protected boolean isClosePending() {
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8d211bc8/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
index 71a4dd4..5792c09 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
@@ -48,8 +48,11 @@ public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, A
@Override
public void buildResource(final AsyncResult request) {
+ super.buildResource(createRequestIntercepter(request));
+ }
- AsyncResult connectionRequest = new AsyncResult() {
+ protected AsyncResult createRequestIntercepter(final AsyncResult request) {
+ return new AsyncResult() {
@Override
public void onSuccess() {
@@ -90,8 +93,6 @@ public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, A
return getResource().isOpen();
}
};
-
- super.buildResource(connectionRequest);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8d211bc8/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
index dc11976..3897186 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
@@ -72,7 +72,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
+ testPeer.expectOpen();
+ testPeer.expectClose();
+
connection.close();
+
+ testPeer.waitForAllHandlersToCompleteNoAssert(1000);
}
}
@@ -91,7 +96,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
+ testPeer.expectOpen();
+ testPeer.expectClose();
+
connection.close();
+
+ testPeer.waitForAllHandlersToCompleteNoAssert(1000);
}
}
@@ -110,7 +120,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
+ testPeer.expectOpen();
+ testPeer.expectClose();
+
connection.close();
+
+ testPeer.waitForAllHandlersToCompleteNoAssert(1000);
}
}
@@ -129,7 +144,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
+ testPeer.expectOpen();
+ testPeer.expectClose();
+
connection.close();
+
+ testPeer.waitForAllHandlersToCompleteNoAssert(1000);
}
}
@@ -154,7 +174,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
+ testPeer.expectOpen();
+ testPeer.expectClose();
+
connection.close();
+
+ testPeer.waitForAllHandlersToCompleteNoAssert(1000);
}
}
@@ -232,7 +257,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
+ testPeer.expectOpen();
+ testPeer.expectClose();
+
connection.close();
+
+ testPeer.waitForAllHandlersToCompleteNoAssert(1000);
}
}
}
@@ -259,7 +289,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
+ testPeer.expectOpen();
+ testPeer.expectClose();
+
connection.close();
+
+ testPeer.waitForAllHandlersToCompleteNoAssert(1000);
}
}
@@ -285,7 +320,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
+ testPeer.expectOpen();
+ testPeer.expectClose();
+
connection.close();
+
+ testPeer.waitForAllHandlersToCompleteNoAssert(1000);
}
}
@@ -311,7 +351,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToCompleteNoAssert(1000);
+ testPeer.expectOpen();
+ testPeer.expectClose();
+
connection.close();
+
+ testPeer.waitForAllHandlersToCompleteNoAssert(1000);
}
}
@@ -337,7 +382,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
+ testPeer.expectOpen();
+ testPeer.expectClose();
+
connection.close();
+
+ testPeer.waitForAllHandlersToCompleteNoAssert(1000);
}
}
@@ -363,7 +413,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
+ testPeer.expectOpen();
+ testPeer.expectClose();
+
connection.close();
+
+ testPeer.waitForAllHandlersToCompleteNoAssert(1000);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8d211bc8/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
index 188e6d8..65e378d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
@@ -52,7 +52,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase {
private static final String CLIENT_JKS_TRUSTSTORE = "src/test/resources/client-jks.truststore";
private static final String PASSWORD = "password";
- @Test //(timeout = 20000) // TODO
+ @Test(timeout = 20000)
public void testSaslExternalConnection() throws Exception {
TransportSslOptions sslOptions = new TransportSslOptions();
sslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8d211bc8/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
index 458db54..7ce6acd 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
@@ -125,6 +125,9 @@ public class AmqpProviderTest extends QpidJmsTestCase {
fail("Should have thrown an error, no listener registered.");
} catch (Exception ex) {
}
+
+ testPeer.expectOpen();
+ testPeer.expectClose();
}
@Test(timeout=20000)
@@ -135,11 +138,16 @@ public class AmqpProviderTest extends QpidJmsTestCase {
provider.connect(connectionInfo);
assertTrue(provider.toString().contains("localhost"));
assertTrue(provider.toString().contains(String.valueOf(peerURI.getPort())));
+
+ testPeer.expectOpen();
+ testPeer.expectClose();
}
@Test(timeout=20000)
public void testClosedProviderThrowsIOException() throws IOException {
testPeer.expectSaslAnonymous();
+ testPeer.expectOpen();
+ testPeer.expectClose();
provider = new AmqpProvider(peerURI);
provider.connect(connectionInfo);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org