You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/12/15 15:28:37 UTC
qpid-jms git commit: QPIDJMS-232: a little further reorg of the
connect sequence and some more tests
Repository: qpid-jms
Updated Branches:
refs/heads/master 1f2f4aa65 -> 3001facb7
QPIDJMS-232: a little further reorg of the connect sequence and some more tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/3001facb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/3001facb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/3001facb
Branch: refs/heads/master
Commit: 3001facb74b3d3455a0d51520afe7e833fe8e5bd
Parents: 1f2f4aa
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Dec 15 15:23:36 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Dec 15 15:23:36 2016 +0000
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpProvider.java | 66 +++++++++-----------
.../integration/ConnectionIntegrationTest.java | 18 ++++++
.../jms/integration/SslIntegrationTest.java | 31 +++++++++
.../jms/test/testpeer/HeaderHandlerImpl.java | 11 +++-
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 27 ++++++--
.../jms/test/testpeer/TestAmqpPeerRunner.java | 16 +++++
6 files changed, 126 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3001facb/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 74d5bb4..88cd86c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -171,53 +171,47 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
connectionRequest = connectRequest;
- protonTransport.setEmitFlowEventOnSend(false);
+ try
+ {
+ protonTransport.setEmitFlowEventOnSend(false);
- if (getMaxFrameSize() > 0) {
- protonTransport.setMaxFrameSize(getMaxFrameSize());
- }
+ if (getMaxFrameSize() > 0) {
+ protonTransport.setMaxFrameSize(getMaxFrameSize());
+ }
- protonTransport.setChannelMax(getChannelMax());
- protonTransport.setIdleTimeout(idleTimeout);
- protonTransport.bind(protonConnection);
- protonConnection.collect(protonCollector);
+ protonTransport.setChannelMax(getChannelMax());
+ protonTransport.setIdleTimeout(idleTimeout);
+ protonTransport.bind(protonConnection);
+ protonConnection.collect(protonCollector);
- SSLContext sslContextOverride = connectionInfo.getSslContextOverride();
+ SSLContext sslContextOverride = connectionInfo.getSslContextOverride();
- try {
transport = TransportFactory.create(getTransportType(), getRemoteURI());
- } catch (Exception e) {
- connectionRequest.onFailure(IOExceptionSupport.create(e));
- }
-
- transport.setTransportListener(AmqpProvider.this);
-
- try {
+ transport.setTransportListener(AmqpProvider.this);
transport.connect(sslContextOverride);
- } catch (Exception e) {
- connectionRequest.onFailure(IOExceptionSupport.create(e));
- }
- if (saslLayer) {
- Sasl sasl = protonTransport.sasl();
- sasl.client();
+ if (saslLayer) {
+ Sasl sasl = protonTransport.sasl();
+ sasl.client();
- String hostname = getVhost();
- if (hostname == null) {
- hostname = remoteURI.getHost();
- } else if (hostname.isEmpty()) {
- hostname = null;
- }
+ String hostname = getVhost();
+ if (hostname == null) {
+ hostname = remoteURI.getHost();
+ } else if (hostname.isEmpty()) {
+ hostname = null;
+ }
- sasl.setRemoteHostname(hostname);
+ sasl.setRemoteHostname(hostname);
- authenticator = new AmqpSaslAuthenticator(connectionRequest, sasl, connectionInfo, transport.getLocalPrincipal(), saslMechanisms);
- }
+ authenticator = new AmqpSaslAuthenticator(connectionRequest, sasl, connectionInfo, transport.getLocalPrincipal(), saslMechanisms);
- if (saslLayer) {
- pumpToProtonTransport();
- } else {
- connectRequest.onSuccess();
+ pumpToProtonTransport();
+ } else {
+ connectRequest.onSuccess();
+ }
+ }
+ catch (Throwable t) {
+ connectionRequest.onFailure(IOExceptionSupport.create(t));
}
}
});
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3001facb/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index 74fb5a3..782fb50 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -561,4 +561,22 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(3000);
}
}
+
+ @Test(timeout = 20000)
+ public void testCreateConnectionWithServerSendingPreemptiveData() throws Exception {
+ boolean sendServerSaslHeaderPreEmptively = true;
+ try (TestAmqpPeer testPeer = new TestAmqpPeer(null, false, sendServerSaslHeaderPreEmptively);) {
+ // Don't use test fixture, handle the connection directly to control sasl behaviour
+ testPeer.expectSaslAnonymousWithPreEmptiveServerHeader();
+ testPeer.expectOpen();
+ testPeer.expectBegin();
+
+ JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort());
+ Connection connection = factory.createConnection();
+ connection.start();
+
+ testPeer.expectClose();
+ connection.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3001facb/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SslIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SslIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SslIntegrationTest.java
index 41128d1..5289a48 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SslIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SslIntegrationTest.java
@@ -89,6 +89,37 @@ public class SslIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testCreateSslConnectionWithServerSendingPreemptiveData() throws Exception {
+ TransportSslOptions serverSslOptions = new TransportSslOptions();
+ serverSslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+ serverSslOptions.setKeyStorePassword(PASSWORD);
+ serverSslOptions.setVerifyHost(false);
+
+ SSLContext serverSslContext = TransportSupport.createSslContext(serverSslOptions);
+
+ boolean sendServerSaslHeaderPreEmptively = true;
+ try (TestAmqpPeer testPeer = new TestAmqpPeer(serverSslContext, false, sendServerSaslHeaderPreEmptively);) {
+ // Don't use test fixture, handle the connection directly to control sasl behaviour
+ testPeer.expectSaslAnonymousWithPreEmptiveServerHeader();
+ testPeer.expectOpen();
+ testPeer.expectBegin();
+
+ String connOptions = "?transport.trustStoreLocation=" + CLIENT_JKS_TRUSTSTORE + "&" +
+ "transport.trustStorePassword=" + PASSWORD;
+
+ JmsConnectionFactory factory = new JmsConnectionFactory("amqps://localhost:" + testPeer.getServerPort() + connOptions);
+ Connection connection = factory.createConnection();
+ connection.start();
+
+ Socket socket = testPeer.getClientSocket();
+ assertTrue(socket instanceof SSLSocket);
+
+ testPeer.expectClose();
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 20000)
public void testCreateAndCloseSslConnectionWithClientAuth() throws Exception {
TransportSslOptions sslOptions = new TransportSslOptions();
sslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3001facb/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/HeaderHandlerImpl.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/HeaderHandlerImpl.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/HeaderHandlerImpl.java
index 66cabf8..39a12ed 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/HeaderHandlerImpl.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/HeaderHandlerImpl.java
@@ -60,8 +60,15 @@ class HeaderHandlerImpl implements HeaderHandler
peer.assertionFailed(ae);
}
- LOGGER.debug("Sending header response.");
- peer.sendHeader(_response);
+ if (_response == null || _response.length == 0)
+ {
+ LOGGER.debug("Skipping header response as none was instructed");
+ }
+ else
+ {
+ LOGGER.debug("Sending header response: " + new Binary(_response));
+ peer.sendHeader(_response);
+ }
if(_onCompletion != null)
{
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3001facb/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index a69906b..89370e0 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -147,7 +147,13 @@ public class TestAmqpPeer implements AutoCloseable
public TestAmqpPeer(SSLContext context, boolean needClientCert) throws IOException
{
+ this(context, needClientCert, false);
+ }
+
+ public TestAmqpPeer(SSLContext context, boolean needClientCert, boolean sendSaslHeaderPreEmptively) throws IOException
+ {
_driverRunnable = new TestAmqpPeerRunner(this, context, needClientCert);
+ _driverRunnable.setSendSaslHeaderPreEmptively(sendSaslHeaderPreEmptively);
_driverThread = new Thread(_driverRunnable, "MockAmqpPeer-" + _driverRunnable.getServerPort());
_driverThread.start();
}
@@ -421,10 +427,14 @@ public class TestAmqpPeer implements AutoCloseable
return openFrame;
}
- public void expectSaslAuthentication(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Matcher<?> hostnameMatcher)
+ private void expectSaslAuthentication(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Matcher<?> hostnameMatcher, boolean sendSaslHeaderResponse)
{
SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(mechanism);
- addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER,
+ byte[] saslHeaderResponse = null;
+ if(sendSaslHeaderResponse) {
+ saslHeaderResponse = AmqpHeader.SASL_HEADER;
+ }
+ addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, saslHeaderResponse,
new FrameSender(
this, FrameType.SASL, 0,
saslMechanismsFrame, null)));
@@ -469,7 +479,7 @@ public class TestAmqpPeer implements AutoCloseable
Matcher<Binary> initialResponseMatcher = equalTo(new Binary(data));
- expectSaslAuthentication(PLAIN, initialResponseMatcher, null);
+ expectSaslAuthentication(PLAIN, initialResponseMatcher, null, true);
}
public void expectSaslExternal()
@@ -479,7 +489,7 @@ public class TestAmqpPeer implements AutoCloseable
throw new IllegalStateException("need-client-cert must be enabled on the test peer");
}
- expectSaslAuthentication(EXTERNAL, equalTo(new Binary(new byte[0])), null);
+ expectSaslAuthentication(EXTERNAL, equalTo(new Binary(new byte[0])), null, true);
}
public void expectSaslAnonymous()
@@ -489,7 +499,14 @@ public class TestAmqpPeer implements AutoCloseable
public void expectSaslAnonymous(Matcher<?> hostnameMatcher)
{
- expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), hostnameMatcher);
+ expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), hostnameMatcher, true);
+ }
+
+ public void expectSaslAnonymousWithPreEmptiveServerHeader()
+ {
+ assertThat("Peer should be created with instruction to send preemptively", _driverRunnable.isSendSaslHeaderPreEmptively(), equalTo(true));
+ boolean sendSaslHeaderResponse = false; // Must arrange for the server to have already sent it preemptively
+ expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, sendSaslHeaderResponse);
}
public void expectFailingSaslAuthentication(Symbol[] serverMechs, Symbol clientSelectedMech)
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3001facb/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
index 9dfdedc..38005b0 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
@@ -31,6 +31,7 @@ import javax.net.ssl.SSLServerSocketFactory;
import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.engine.impl.AmqpHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +52,7 @@ class TestAmqpPeerRunner implements Runnable
private final TestFrameParser _testFrameParser;
private volatile boolean _suppressReadExceptionOnClose;
private volatile boolean _exitReadLoopEarly;
+ private volatile boolean _sendSaslHeaderPreEmptively;
private volatile Throwable _throwable;
@@ -95,6 +97,12 @@ class TestAmqpPeerRunner implements Runnable
_clientSocket = clientSocket;
_networkOutputStream = networkOutputStream;
+ if (_sendSaslHeaderPreEmptively) {
+ byte[] bytes = AmqpHeader.SASL_HEADER;
+ LOGGER.debug("Sending header pre-emptively: {}", new Binary(bytes));
+ _networkOutputStream.write(bytes);
+ }
+
int bytesRead;
byte[] networkInputBytes = new byte[1024];
@@ -242,4 +250,12 @@ class TestAmqpPeerRunner implements Runnable
public void exitReadLoopEarly() {
_exitReadLoopEarly = true;
}
+
+ public void setSendSaslHeaderPreEmptively(boolean sendSaslHeaderPreEmptively) {
+ _sendSaslHeaderPreEmptively = sendSaslHeaderPreEmptively;
+ }
+
+ public boolean isSendSaslHeaderPreEmptively() {
+ return _sendSaslHeaderPreEmptively;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org