You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/12/08 17:36:53 UTC
qpid-broker-j git commit: QPID-8042: [Broker-J][AMQP 1.0] Process
SASL frames first before parsing the remaining part of incoming byte buffer
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 97ebcc8ef -> 89e01ec7e
QPID-8042: [Broker-J][AMQP 1.0] Process SASL frames first before parsing the remaining part of incoming byte buffer
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/89e01ec7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/89e01ec7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/89e01ec7
Branch: refs/heads/master
Commit: 89e01ec7e59bf097a74f770be2aad5bf292a6061
Parents: 97ebcc8
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Dec 8 17:33:00 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Dec 8 17:33:00 2017 +0000
----------------------------------------------------------------------
.../protocol/v1_0/framing/FrameHandler.java | 11 ++++-
.../qpid/tests/protocol/v1_0/FrameDecoder.java | 39 ++++++++-------
.../v1_0/transport/security/sasl/SaslTest.java | 50 +++++++++++++++-----
3 files changed, 70 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/89e01ec7/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
index f7e4029..23d08c3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
@@ -202,19 +202,26 @@ public class FrameHandler implements ProtocolHandler
return frameBody;
}
});
+
+ if (_isSasl)
+ {
+ break;
+ }
}
catch (AmqpErrorException ex)
{
frameParsingError = ex.getError();
}
}
- _connectionHandler.receive(channelFrameBodies);
if (frameParsingError != null)
{
_connectionHandler.handleError(frameParsingError);
_errored = true;
-
+ }
+ else
+ {
+ _connectionHandler.receive(channelFrameBodies);
}
}
catch (RuntimeException e)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/89e01ec7/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
index 4dc06cb..0c94ad7 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
@@ -90,27 +90,34 @@ public class FrameDecoder implements InputDecoder
@Override
public Collection<Response<?>> decode(final ByteBuffer inputBuffer)
{
- List<Response<?>> responses = new ArrayList<>();
+
QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(inputBuffer);
- switch(_state)
+ int remaining;
+
+ do
{
- case HEADER:
- if (inputBuffer.remaining() >= 8)
- {
- byte[] header = new byte[8];
- inputBuffer.get(header);
- responses.add(new HeaderResponse(header));
- _state = ParsingState.PERFORMATIVES;
+ remaining = qpidByteBuffer.remaining();
+ switch(_state)
+ {
+ case HEADER:
+ if (inputBuffer.remaining() >= 8)
+ {
+ byte[] header = new byte[8];
+ inputBuffer.get(header);
+ _connectionHandler._responseQueue.add(new HeaderResponse(header));
+ _state = ParsingState.PERFORMATIVES;
+ }
+ break;
+ case PERFORMATIVES:
_frameHandler.parse(qpidByteBuffer);
- }
- break;
- case PERFORMATIVES:
- _frameHandler.parse(qpidByteBuffer);
- break;
- default:
- throw new IllegalStateException("Unexpected state : " + _state);
+ break;
+ default:
+ throw new IllegalStateException("Unexpected state : " + _state);
+ }
}
+ while (qpidByteBuffer.remaining() != remaining);
+ List<Response<?>> responses = new ArrayList<>();
Response<?> r;
while((r = _connectionHandler._responseQueue.poll())!=null)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/89e01ec7/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index 1ca3f20..1b81507 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -36,18 +37,23 @@ import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.xml.bind.DatatypeConverter;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
+import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameEncoder;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -106,7 +112,6 @@ public class SaslTest extends BrokerAdminUsingTestBase
}
}
- @Ignore("QPID-8042")
@Test
@SpecificationTest(section = "2.4.2",
description = "For applications that use many short-lived connections,"
@@ -118,17 +123,38 @@ public class SaslTest extends BrokerAdminUsingTestBase
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
try (FrameTransport transport = new FrameTransport(addr, true).connect())
{
- final Binary initialResponse = new Binary(String.format("\0%s\0%s", _username, _password).getBytes(StandardCharsets.US_ASCII));
final Interaction interaction = transport.newInteraction();
- interaction.protocolHeader(SASL_AMQP_HEADER_BYTES)
- .negotiateProtocol()
- .saslMechanism(PLAIN)
- .saslInitialResponse(initialResponse)
- .saslInit()
- .protocolHeader(AMQP_HEADER_BYTES)
- .negotiateProtocol()
- .openContainerId("testContainerId")
- .open();
+ FrameEncoder frameEncoder = new FrameEncoder();
+
+ SaslInit saslInit = new SaslInit();
+ saslInit.setMechanism(PLAIN);
+ saslInit.setInitialResponse(new Binary(String.format("\0%s\0%s", _username, _password)
+ .getBytes(StandardCharsets.US_ASCII)));
+ ByteBuffer saslInitByteBuffer = frameEncoder.encode(new SASLFrame(saslInit));
+
+ Open open = new Open();
+ open.setContainerId("containerId");
+ ByteBuffer openByteBuffer = frameEncoder.encode(new TransportFrame(0, open));
+
+ int initSize = saslInitByteBuffer.remaining();
+ int openSize = openByteBuffer.remaining();
+ int dataLength = SASL_AMQP_HEADER_BYTES.length + AMQP_HEADER_BYTES.length + initSize + openSize;
+ byte[] data = new byte[dataLength];
+
+ System.arraycopy(SASL_AMQP_HEADER_BYTES, 0, data, 0, SASL_AMQP_HEADER_BYTES.length);
+ saslInitByteBuffer.get(data, SASL_AMQP_HEADER_BYTES.length, initSize);
+ System.arraycopy(AMQP_HEADER_BYTES,
+ 0,
+ data,
+ SASL_AMQP_HEADER_BYTES.length + initSize,
+ AMQP_HEADER_BYTES.length);
+ openByteBuffer.get(data, SASL_AMQP_HEADER_BYTES.length + AMQP_HEADER_BYTES.length + initSize, openSize);
+
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+ buffer.writeBytes(data);
+
+ transport.sendPerformative(buffer);
+
final byte[] saslHeaderResponse = interaction.consumeResponse().getLatestResponse(byte[].class);
assertThat(saslHeaderResponse, is(equalTo(SASL_AMQP_HEADER_BYTES)));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org