You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/12/08 17:36:53 UTC

qpid-broker-j git commit: QPID-8042: [Broker-J][AMQP 1.0] Process SASL frames first before parsing the remaining part of incoming byte buffer

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 97ebcc8ef -> 89e01ec7e


QPID-8042: [Broker-J][AMQP 1.0] Process SASL frames first before parsing the remaining part of incoming byte buffer


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/89e01ec7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/89e01ec7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/89e01ec7

Branch: refs/heads/master
Commit: 89e01ec7e59bf097a74f770be2aad5bf292a6061
Parents: 97ebcc8
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Dec 8 17:33:00 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Dec 8 17:33:00 2017 +0000

----------------------------------------------------------------------
 .../protocol/v1_0/framing/FrameHandler.java     | 11 ++++-
 .../qpid/tests/protocol/v1_0/FrameDecoder.java  | 39 ++++++++-------
 .../v1_0/transport/security/sasl/SaslTest.java  | 50 +++++++++++++++-----
 3 files changed, 70 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/89e01ec7/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
index f7e4029..23d08c3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
@@ -202,19 +202,26 @@ public class FrameHandler implements ProtocolHandler
                             return frameBody;
                         }
                     });
+
+                    if (_isSasl)
+                    {
+                        break;
+                    }
                 }
                 catch (AmqpErrorException ex)
                 {
                     frameParsingError = ex.getError();
                 }
             }
-            _connectionHandler.receive(channelFrameBodies);
 
             if (frameParsingError != null)
             {
                 _connectionHandler.handleError(frameParsingError);
                 _errored = true;
-
+            }
+            else
+            {
+                _connectionHandler.receive(channelFrameBodies);
             }
         }
         catch (RuntimeException e)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/89e01ec7/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
index 4dc06cb..0c94ad7 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
@@ -90,27 +90,34 @@ public class FrameDecoder implements InputDecoder
     @Override
     public Collection<Response<?>> decode(final ByteBuffer inputBuffer)
     {
-        List<Response<?>> responses = new ArrayList<>();
+
         QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(inputBuffer);
-        switch(_state)
+        int remaining;
+
+        do
         {
-            case HEADER:
-                if (inputBuffer.remaining() >= 8)
-                {
-                    byte[] header = new byte[8];
-                    inputBuffer.get(header);
-                    responses.add(new HeaderResponse(header));
-                    _state = ParsingState.PERFORMATIVES;
+            remaining = qpidByteBuffer.remaining();
+            switch(_state)
+            {
+                case HEADER:
+                    if (inputBuffer.remaining() >= 8)
+                    {
+                        byte[] header = new byte[8];
+                        inputBuffer.get(header);
+                        _connectionHandler._responseQueue.add(new HeaderResponse(header));
+                        _state = ParsingState.PERFORMATIVES;
+                    }
+                    break;
+                case PERFORMATIVES:
                     _frameHandler.parse(qpidByteBuffer);
-                }
-                break;
-            case PERFORMATIVES:
-                _frameHandler.parse(qpidByteBuffer);
-                break;
-            default:
-                throw new IllegalStateException("Unexpected state : " + _state);
+                    break;
+                default:
+                    throw new IllegalStateException("Unexpected state : " + _state);
+            }
         }
+        while (qpidByteBuffer.remaining() != remaining);
 
+        List<Response<?>> responses = new ArrayList<>();
         Response<?> r;
         while((r = _connectionHandler._responseQueue.poll())!=null)
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/89e01ec7/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index 1ca3f20..1b81507 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
@@ -36,18 +37,23 @@ import javax.crypto.Mac;
 import javax.crypto.spec.SecretKeySpec;
 import javax.xml.bind.DatatypeConverter;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
+import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameEncoder;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -106,7 +112,6 @@ public class SaslTest extends BrokerAdminUsingTestBase
         }
     }
 
-    @Ignore("QPID-8042")
     @Test
     @SpecificationTest(section = "2.4.2",
             description = "For applications that use many short-lived connections,"
@@ -118,17 +123,38 @@ public class SaslTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
         try (FrameTransport transport = new FrameTransport(addr, true).connect())
         {
-            final Binary initialResponse = new Binary(String.format("\0%s\0%s", _username, _password).getBytes(StandardCharsets.US_ASCII));
             final Interaction interaction = transport.newInteraction();
-            interaction.protocolHeader(SASL_AMQP_HEADER_BYTES)
-                       .negotiateProtocol()
-                       .saslMechanism(PLAIN)
-                       .saslInitialResponse(initialResponse)
-                       .saslInit()
-                       .protocolHeader(AMQP_HEADER_BYTES)
-                       .negotiateProtocol()
-                       .openContainerId("testContainerId")
-                       .open();
+            FrameEncoder frameEncoder = new FrameEncoder();
+
+            SaslInit saslInit = new SaslInit();
+            saslInit.setMechanism(PLAIN);
+            saslInit.setInitialResponse(new Binary(String.format("\0%s\0%s", _username, _password)
+                                                         .getBytes(StandardCharsets.US_ASCII)));
+            ByteBuffer saslInitByteBuffer = frameEncoder.encode(new SASLFrame(saslInit));
+
+            Open open = new Open();
+            open.setContainerId("containerId");
+            ByteBuffer openByteBuffer = frameEncoder.encode(new TransportFrame(0, open));
+
+            int initSize = saslInitByteBuffer.remaining();
+            int openSize = openByteBuffer.remaining();
+            int dataLength = SASL_AMQP_HEADER_BYTES.length + AMQP_HEADER_BYTES.length + initSize + openSize;
+            byte[] data = new byte[dataLength];
+
+            System.arraycopy(SASL_AMQP_HEADER_BYTES, 0, data, 0, SASL_AMQP_HEADER_BYTES.length);
+            saslInitByteBuffer.get(data, SASL_AMQP_HEADER_BYTES.length, initSize);
+            System.arraycopy(AMQP_HEADER_BYTES,
+                             0,
+                             data,
+                             SASL_AMQP_HEADER_BYTES.length + initSize,
+                             AMQP_HEADER_BYTES.length);
+            openByteBuffer.get(data, SASL_AMQP_HEADER_BYTES.length + AMQP_HEADER_BYTES.length + initSize, openSize);
+
+            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+            buffer.writeBytes(data);
+
+            transport.sendPerformative(buffer);
+
 
             final byte[] saslHeaderResponse = interaction.consumeResponse().getLatestResponse(byte[].class);
             assertThat(saslHeaderResponse, is(equalTo(SASL_AMQP_HEADER_BYTES)));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org