You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/03/09 17:20:46 UTC

qpid-jms git commit: QPIDJMS-272: better handle servers using unexpected protocol type

Repository: qpid-jms
Updated Branches:
  refs/heads/master 812a046ed -> 4624e5e60


QPIDJMS-272: better handle servers using unexpected protocol type


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/4624e5e6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/4624e5e6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/4624e5e6

Branch: refs/heads/master
Commit: 4624e5e6057d82fbc28f3c2b41a9a1510180c92d
Parents: 812a046
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Mar 9 17:19:14 2017 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Mar 9 17:19:14 2017 +0000

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 44 +++++++++++---------
 .../provider/amqp/AmqpSaslAuthenticator.java    |  9 +++-
 .../integration/ConnectionIntegrationTest.java  | 29 +++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |  5 +++
 4 files changed, 67 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4624e5e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index d5d5ade..b571244 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -263,7 +263,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                         } else {
                             // If the SASL authentication occurred but failed then we don't
                             // need to do an open / close
-                            if (authenticator != null && !authenticator.wasSuccessful()) {
+                            if (authenticator != null && (!authenticator.isComplete() || !authenticator.wasSuccessful())) {
                                 request.onSuccess();
                                 return;
                             }
@@ -765,28 +765,34 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
             @Override
             public void run() {
-                if (isTraceBytes()) {
-                    TRACE_BYTES.info("Received: {}", ByteBufUtil.hexDump(input));
-                }
+                try {
+                    if (isTraceBytes()) {
+                        TRACE_BYTES.info("Received: {}", ByteBufUtil.hexDump(input));
+                    }
 
-                ByteBuffer source = input.nioBuffer();
+                    ByteBuffer source = input.nioBuffer();
 
-                do {
-                    ByteBuffer buffer = protonTransport.getInputBuffer();
-                    int limit = Math.min(buffer.remaining(), source.remaining());
-                    ByteBuffer duplicate = source.duplicate();
-                    duplicate.limit(source.position() + limit);
-                    buffer.put(duplicate);
-                    protonTransport.processInput();
-                    source.position(source.position() + limit);
-                } while (source.hasRemaining());
+                    do {
+                        ByteBuffer buffer = protonTransport.getInputBuffer();
+                        int limit = Math.min(buffer.remaining(), source.remaining());
+                        ByteBuffer duplicate = source.duplicate();
+                        duplicate.limit(source.position() + limit);
+                        buffer.put(duplicate);
+                        protonTransport.processInput().checkIsOk();
+                        source.position(source.position() + limit);
+                    } while (source.hasRemaining());
 
-                ReferenceCountUtil.release(input);
+                    ReferenceCountUtil.release(input);
 
-                // Process the state changes from the latest data and then answer back
-                // any pending updates to the Broker.
-                processUpdates();
-                pumpToProtonTransport();
+                    // Process the state changes from the latest data and then answer back
+                    // any pending updates to the Broker.
+                    processUpdates();
+                    pumpToProtonTransport();
+                } catch (Throwable t) {
+                    LOG.warn("Caught problem during data processing: {}", t.getMessage(), t);
+
+                    fireProviderException(t);
+                }
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4624e5e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
index 9ee41e5..88689db 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
@@ -40,6 +40,7 @@ public class AmqpSaslAuthenticator {
     private final Principal localPrincipal;
     private Set<String> mechanismsRestriction;
     private final AsyncResult authenticationRequest;
+    private boolean complete;
 
     /**
      * Create the authenticator and initialize it.
@@ -106,7 +107,13 @@ public class AmqpSaslAuthenticator {
             authenticationRequest.onFailure(result);
         }
 
-        return authenticationRequest.isComplete();
+        complete = authenticationRequest.isComplete();
+
+        return complete;
+    }
+
+    public boolean isComplete() {
+       return complete;
     }
 
     public boolean wasSuccessful() throws IllegalStateException {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4624e5e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index e5b752c..76c1443 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.jms.integration;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.NETWORK_HOST;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.OPEN_HOSTNAME;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PORT;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.equalTo;
@@ -33,6 +34,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -72,6 +74,7 @@ import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.transaction.TxnCapability;
+import org.apache.qpid.proton.engine.impl.AmqpHeader;
 import org.hamcrest.Matcher;
 import org.junit.Test;
 
@@ -87,6 +90,32 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 10000)
+    public void testCreateConnectionToNonSaslPeer() throws Exception {
+        doConnectionWithUnexpectedHeaderTestImpl(AmqpHeader.HEADER);
+    }
+
+    @Test(timeout = 10000)
+    public void testCreateConnectionToNonAmqpPeer() throws Exception {
+        byte[] responseHeader = new byte[] { 'N', 'O', 'T', '-', 'A', 'M', 'Q', 'P' };
+        doConnectionWithUnexpectedHeaderTestImpl(responseHeader);
+    }
+
+    private void doConnectionWithUnexpectedHeaderTestImpl(byte[] responseHeader) throws Exception, IOException {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            testPeer.expectHeader(AmqpHeader.SASL_HEADER, responseHeader);
+
+            ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort());
+            try {
+                factory.createConnection("guest", "guest");
+                fail("Expected connection creation to fail");
+            } catch (JMSException jmse) {
+                assertThat(jmse.getMessage(), containsString("SASL header mismatch"));
+            }
+        }
+    }
+
     @Test(timeout = 20000)
     public void testCloseConnectionTimesOut() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4624e5e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 719fbc7..f6ebc7a 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -433,6 +433,11 @@ public class TestAmqpPeer implements AutoCloseable
         return openFrame;
     }
 
+    public void expectHeader(byte[] header, byte[] response)
+    {
+        addHandler(new HeaderHandlerImpl(header, response));
+    }
+
     private void expectSaslAuthentication(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Matcher<?> hostnameMatcher, boolean sendSaslHeaderResponse)
     {
         SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(mechanism);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org