You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/12/15 15:28:37 UTC

qpid-jms git commit: QPIDJMS-232: a little further reorg of the connect sequence and some more tests

Repository: qpid-jms
Updated Branches:
  refs/heads/master 1f2f4aa65 -> 3001facb7


QPIDJMS-232: a little further reorg of the connect sequence and some more tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/3001facb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/3001facb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/3001facb

Branch: refs/heads/master
Commit: 3001facb74b3d3455a0d51520afe7e833fe8e5bd
Parents: 1f2f4aa
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Dec 15 15:23:36 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Dec 15 15:23:36 2016 +0000

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 66 +++++++++-----------
 .../integration/ConnectionIntegrationTest.java  | 18 ++++++
 .../jms/integration/SslIntegrationTest.java     | 31 +++++++++
 .../jms/test/testpeer/HeaderHandlerImpl.java    | 11 +++-
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 27 ++++++--
 .../jms/test/testpeer/TestAmqpPeerRunner.java   | 16 +++++
 6 files changed, 126 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3001facb/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 74d5bb4..88cd86c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -171,53 +171,47 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
                 connectionRequest = connectRequest;
 
-                protonTransport.setEmitFlowEventOnSend(false);
+                try
+                {
+                    protonTransport.setEmitFlowEventOnSend(false);
 
-                if (getMaxFrameSize() > 0) {
-                    protonTransport.setMaxFrameSize(getMaxFrameSize());
-                }
+                    if (getMaxFrameSize() > 0) {
+                        protonTransport.setMaxFrameSize(getMaxFrameSize());
+                    }
 
-                protonTransport.setChannelMax(getChannelMax());
-                protonTransport.setIdleTimeout(idleTimeout);
-                protonTransport.bind(protonConnection);
-                protonConnection.collect(protonCollector);
+                    protonTransport.setChannelMax(getChannelMax());
+                    protonTransport.setIdleTimeout(idleTimeout);
+                    protonTransport.bind(protonConnection);
+                    protonConnection.collect(protonCollector);
 
-                SSLContext sslContextOverride = connectionInfo.getSslContextOverride();
+                    SSLContext sslContextOverride = connectionInfo.getSslContextOverride();
 
-                try {
                     transport = TransportFactory.create(getTransportType(), getRemoteURI());
-                } catch (Exception e) {
-                    connectionRequest.onFailure(IOExceptionSupport.create(e));
-                }
-
-                transport.setTransportListener(AmqpProvider.this);
-
-                try {
+                    transport.setTransportListener(AmqpProvider.this);
                     transport.connect(sslContextOverride);
-                } catch (Exception e) {
-                    connectionRequest.onFailure(IOExceptionSupport.create(e));
-                }
 
-                if (saslLayer) {
-                    Sasl sasl = protonTransport.sasl();
-                    sasl.client();
+                    if (saslLayer) {
+                        Sasl sasl = protonTransport.sasl();
+                        sasl.client();
 
-                    String hostname = getVhost();
-                    if (hostname == null) {
-                        hostname = remoteURI.getHost();
-                    } else if (hostname.isEmpty()) {
-                        hostname = null;
-                    }
+                        String hostname = getVhost();
+                        if (hostname == null) {
+                            hostname = remoteURI.getHost();
+                        } else if (hostname.isEmpty()) {
+                            hostname = null;
+                        }
 
-                    sasl.setRemoteHostname(hostname);
+                        sasl.setRemoteHostname(hostname);
 
-                    authenticator = new AmqpSaslAuthenticator(connectionRequest, sasl, connectionInfo, transport.getLocalPrincipal(), saslMechanisms);
-                }
+                        authenticator = new AmqpSaslAuthenticator(connectionRequest, sasl, connectionInfo, transport.getLocalPrincipal(), saslMechanisms);
 
-                if (saslLayer) {
-                    pumpToProtonTransport();
-                } else {
-                    connectRequest.onSuccess();
+                        pumpToProtonTransport();
+                    } else {
+                        connectRequest.onSuccess();
+                    }
+                }
+                catch (Throwable t) {
+                    connectionRequest.onFailure(IOExceptionSupport.create(t));
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3001facb/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index 74fb5a3..782fb50 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -561,4 +561,22 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
+
+    @Test(timeout = 20000)
+    public void testCreateConnectionWithServerSendingPreemptiveData() throws Exception {
+        boolean sendServerSaslHeaderPreEmptively = true;
+        try (TestAmqpPeer testPeer = new TestAmqpPeer(null, false, sendServerSaslHeaderPreEmptively);) {
+            // Don't use test fixture, handle the connection directly to control sasl behaviour
+            testPeer.expectSaslAnonymousWithPreEmptiveServerHeader();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+
+            JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort());
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3001facb/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SslIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SslIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SslIntegrationTest.java
index 41128d1..5289a48 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SslIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SslIntegrationTest.java
@@ -89,6 +89,37 @@ public class SslIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testCreateSslConnectionWithServerSendingPreemptiveData() throws Exception {
+        TransportSslOptions serverSslOptions = new TransportSslOptions();
+        serverSslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+        serverSslOptions.setKeyStorePassword(PASSWORD);
+        serverSslOptions.setVerifyHost(false);
+
+        SSLContext serverSslContext = TransportSupport.createSslContext(serverSslOptions);
+
+        boolean sendServerSaslHeaderPreEmptively = true;
+        try (TestAmqpPeer testPeer = new TestAmqpPeer(serverSslContext, false, sendServerSaslHeaderPreEmptively);) {
+            // Don't use test fixture, handle the connection directly to control sasl behaviour
+            testPeer.expectSaslAnonymousWithPreEmptiveServerHeader();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+
+            String connOptions = "?transport.trustStoreLocation=" + CLIENT_JKS_TRUSTSTORE + "&" +
+                                  "transport.trustStorePassword=" + PASSWORD;
+
+            JmsConnectionFactory factory = new JmsConnectionFactory("amqps://localhost:" + testPeer.getServerPort() + connOptions);
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            Socket socket = testPeer.getClientSocket();
+            assertTrue(socket instanceof SSLSocket);
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testCreateAndCloseSslConnectionWithClientAuth() throws Exception {
         TransportSslOptions sslOptions = new TransportSslOptions();
         sslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3001facb/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/HeaderHandlerImpl.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/HeaderHandlerImpl.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/HeaderHandlerImpl.java
index 66cabf8..39a12ed 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/HeaderHandlerImpl.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/HeaderHandlerImpl.java
@@ -60,8 +60,15 @@ class HeaderHandlerImpl implements HeaderHandler
             peer.assertionFailed(ae);
         }
 
-        LOGGER.debug("Sending header response.");
-        peer.sendHeader(_response);
+        if (_response == null || _response.length == 0)
+        {
+            LOGGER.debug("Skipping header response as none was instructed");
+        }
+        else
+        {
+            LOGGER.debug("Sending header response: " + new Binary(_response));
+            peer.sendHeader(_response);
+        }
 
         if(_onCompletion != null)
         {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3001facb/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index a69906b..89370e0 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -147,7 +147,13 @@ public class TestAmqpPeer implements AutoCloseable
 
     public TestAmqpPeer(SSLContext context, boolean needClientCert) throws IOException
     {
+        this(context, needClientCert, false);
+    }
+
+    public TestAmqpPeer(SSLContext context, boolean needClientCert, boolean sendSaslHeaderPreEmptively) throws IOException
+    {
         _driverRunnable = new TestAmqpPeerRunner(this, context, needClientCert);
+        _driverRunnable.setSendSaslHeaderPreEmptively(sendSaslHeaderPreEmptively);
         _driverThread = new Thread(_driverRunnable, "MockAmqpPeer-" + _driverRunnable.getServerPort());
         _driverThread.start();
     }
@@ -421,10 +427,14 @@ public class TestAmqpPeer implements AutoCloseable
         return openFrame;
     }
 
-    public void expectSaslAuthentication(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Matcher<?> hostnameMatcher)
+    private void expectSaslAuthentication(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Matcher<?> hostnameMatcher, boolean sendSaslHeaderResponse)
     {
         SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(mechanism);
-        addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER,
+        byte[] saslHeaderResponse = null;
+        if(sendSaslHeaderResponse) {
+            saslHeaderResponse = AmqpHeader.SASL_HEADER;
+        }
+        addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, saslHeaderResponse,
                                             new FrameSender(
                                                     this, FrameType.SASL, 0,
                                                     saslMechanismsFrame, null)));
@@ -469,7 +479,7 @@ public class TestAmqpPeer implements AutoCloseable
 
         Matcher<Binary> initialResponseMatcher = equalTo(new Binary(data));
 
-        expectSaslAuthentication(PLAIN, initialResponseMatcher, null);
+        expectSaslAuthentication(PLAIN, initialResponseMatcher, null, true);
     }
 
     public void expectSaslExternal()
@@ -479,7 +489,7 @@ public class TestAmqpPeer implements AutoCloseable
             throw new IllegalStateException("need-client-cert must be enabled on the test peer");
         }
 
-        expectSaslAuthentication(EXTERNAL, equalTo(new Binary(new byte[0])), null);
+        expectSaslAuthentication(EXTERNAL, equalTo(new Binary(new byte[0])), null, true);
     }
 
     public void expectSaslAnonymous()
@@ -489,7 +499,14 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectSaslAnonymous(Matcher<?> hostnameMatcher)
     {
-        expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), hostnameMatcher);
+        expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), hostnameMatcher, true);
+    }
+
+    public void expectSaslAnonymousWithPreEmptiveServerHeader()
+    {
+        assertThat("Peer should be created with instruction to send preemptively", _driverRunnable.isSendSaslHeaderPreEmptively(), equalTo(true));
+        boolean sendSaslHeaderResponse = false; // Must arrange for the server to have already sent it preemptively
+        expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, sendSaslHeaderResponse);
     }
 
     public void expectFailingSaslAuthentication(Symbol[] serverMechs, Symbol clientSelectedMech)

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3001facb/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
index 9dfdedc..38005b0 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
@@ -31,6 +31,7 @@ import javax.net.ssl.SSLServerSocketFactory;
 
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.engine.impl.AmqpHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,6 +52,7 @@ class TestAmqpPeerRunner implements Runnable
     private final TestFrameParser _testFrameParser;
     private volatile boolean _suppressReadExceptionOnClose;
     private volatile boolean _exitReadLoopEarly;
+    private volatile boolean _sendSaslHeaderPreEmptively;
 
     private volatile Throwable _throwable;
 
@@ -95,6 +97,12 @@ class TestAmqpPeerRunner implements Runnable
             _clientSocket = clientSocket;
             _networkOutputStream = networkOutputStream;
 
+            if (_sendSaslHeaderPreEmptively) {
+                byte[] bytes = AmqpHeader.SASL_HEADER;
+                LOGGER.debug("Sending header pre-emptively: {}", new Binary(bytes));
+                _networkOutputStream.write(bytes);
+            }
+
             int bytesRead;
             byte[] networkInputBytes = new byte[1024];
 
@@ -242,4 +250,12 @@ class TestAmqpPeerRunner implements Runnable
     public void exitReadLoopEarly() {
         _exitReadLoopEarly = true;
     }
+
+    public void setSendSaslHeaderPreEmptively(boolean sendSaslHeaderPreEmptively) {
+        _sendSaslHeaderPreEmptively = sendSaslHeaderPreEmptively;
+    }
+
+    public boolean isSendSaslHeaderPreEmptively() {
+        return _sendSaslHeaderPreEmptively;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org