You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/04/12 16:26:40 UTC
svn commit: r1791140 - in
/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src:
main/java/org/apache/qpid/tests/protocol/v1_0/
test/java/org/apache/qpid/tests/protocol/v1_0/messaging/
test/java/org/apache/qpid/tests/protocol/v1_0/transport/ test/java/...
Author: lquack
Date: Wed Apr 12 16:26:40 2017
New Revision: 1791140
URL: http://svn.apache.org/viewvc?rev=1791140&view=rev
Log:
QPID-7665: [Java Broker] Improvements to AMQP 1.0 Protocol Tests
* add FlowTest and TransferTest
Added:
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
Modified:
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java Wed Apr 12 16:26:40 2017
@@ -26,6 +26,8 @@ import org.apache.qpid.server.plugin.Plu
public interface BrokerAdmin extends Pluggable
{
+ String TEST_QUEUE_NAME = "testQueue";
+
void beforeTestClass(final Class testClass);
void beforeTestMethod(final Class testClass, final Method method);
void afterTestMethod(final Class testClass, final Method method);
@@ -35,6 +37,7 @@ public interface BrokerAdmin extends Plu
void createQueue(String queueName);
void deleteQueue(String queueName);
+ void putMessageOnQueue(String queueName, String... messages);
enum PortType
{
Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java Wed Apr 12 16:26:40 2017
@@ -43,6 +43,7 @@ import org.apache.qpid.server.logging.lo
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Container;
import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.ManageableMessage;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Queue;
@@ -51,6 +52,7 @@ import org.apache.qpid.server.model.Virt
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.store.MemoryConfigurationStore;
import org.apache.qpid.server.util.FileUtils;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.server.virtualhostnode.JsonVirtualHostNode;
@PluggableService
@@ -144,6 +146,11 @@ public class EmbeddedBrokerPerClassAdmin
{
_currentVirtualHostNode.delete();
}
+ else
+ {
+ _currentVirtualHostNode.setAttributes(Collections.singletonMap(VirtualHostNode.DEFAULT_VIRTUAL_HOST_NODE,
+ false));
+ }
}
@Override
@@ -185,6 +192,101 @@ public class EmbeddedBrokerPerClassAdmin
}
@Override
+ public void putMessageOnQueue(final String queueName, final String... messages)
+ {
+ for (String message : messages)
+ {
+ ((QueueManagingVirtualHost<?>) _currentVirtualHostNode.getVirtualHost()).publishMessage(new ManageableMessage()
+ {
+ @Override
+ public String getAddress()
+ {
+ return queueName;
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return false;
+ }
+
+ @Override
+ public Date getExpiration()
+ {
+ return null;
+ }
+
+ @Override
+ public String getCorrelationId()
+ {
+ return null;
+ }
+
+ @Override
+ public String getAppId()
+ {
+ return null;
+ }
+
+ @Override
+ public String getMessageId()
+ {
+ return null;
+ }
+
+ @Override
+ public String getMimeType()
+ {
+ return "text/plain";
+ }
+
+ @Override
+ public String getEncoding()
+ {
+ return null;
+ }
+
+ @Override
+ public int getPriority()
+ {
+ return 0;
+ }
+
+ @Override
+ public Date getNotValidBefore()
+ {
+ return null;
+ }
+
+ @Override
+ public String getReplyTo()
+ {
+ return null;
+ }
+
+ @Override
+ public Map<String, Object> getHeaders()
+ {
+ return null;
+ }
+
+ @Override
+ public Object getContent()
+ {
+ return message;
+ }
+
+ @Override
+ public String getContentTransferEncoding()
+ {
+ return null;
+ }
+ });
+ }
+
+ }
+
+ @Override
public String getType()
{
return "EMBEDDED_BROKER_PER_CLASS";
Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java Wed Apr 12 16:26:40 2017
@@ -92,6 +92,12 @@ public class ExternalQpidBrokerAdminImpl
}
@Override
+ public void putMessageOnQueue(final String queueName, final String... messages)
+ {
+ LOGGER.debug(String.format("puting of %d messages on queue '%s' requested", messages.length, queueName));
+ }
+
+ @Override
public String getType()
{
return "EXTERNAL_BROKER";
Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java Wed Apr 12 16:26:40 2017
@@ -29,6 +29,7 @@ import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -44,25 +45,32 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.core.Is;
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public class FrameTransport implements AutoCloseable
{
- private static final long RESPONSE_TIMEOUT = 6000;
- private static final Set<Integer> _amqpConnectionIds = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private static final Set<Integer> AMQP_CONNECTION_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ public static final long RESPONSE_TIMEOUT = 6000;
private final Channel _channel;
private final BlockingQueue<Response> _queue = new ArrayBlockingQueue<>(100);
@@ -108,7 +116,7 @@ public class FrameTransport implements A
}
finally
{
- _amqpConnectionIds.remove(_amqpConnectionId);
+ AMQP_CONNECTION_IDS.remove(_amqpConnectionId);
_workerGroup.shutdownGracefully();
}
}
@@ -122,8 +130,10 @@ public class FrameTransport implements A
return JdkFutureAdapters.listenInPoolThread(channelFuture);
}
- public ListenableFuture<Void> sendPerformative(final TransportFrame transportFrame) throws Exception
+ public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception
{
+ final List<QpidByteBuffer> payload = frameBody instanceof Transfer ? ((Transfer)frameBody).getPayload() : null;
+ TransportFrame transportFrame = new TransportFrame(channel.shortValue(), frameBody, payload);
ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
channelFuture.sync();
return JdkFutureAdapters.listenInPoolThread(channelFuture);
@@ -131,8 +141,30 @@ public class FrameTransport implements A
public ListenableFuture<Void> sendPerformative(final FrameBody frameBody) throws Exception
{
- TransportFrame transportFrame = new TransportFrame(_amqpChannelId, frameBody);
- return sendPerformative(transportFrame);
+ return sendPerformative(frameBody, UnsignedShort.valueOf(_amqpChannelId));
+ }
+
+ public ListenableFuture<Void> sendPipelined(final byte[] protocolHeader, final TransportFrame... frames)
+ throws InterruptedException
+ {
+ ChannelPromise promise = _channel.newPromise();
+ if (protocolHeader != null)
+ {
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+ buffer.writeBytes(protocolHeader);
+ _channel.write(buffer);
+ }
+ for (TransportFrame frame : frames)
+ {
+ _channel.write(frame, promise);
+ }
+ _channel.flush();
+ return JdkFutureAdapters.listenInPoolThread(promise);
+ }
+
+ public ListenableFuture<Void> sendPipelined(final TransportFrame... frames) throws InterruptedException
+ {
+ return sendPipelined(null, frames);
}
public Response getNextResponse() throws Exception
@@ -158,8 +190,7 @@ public class FrameTransport implements A
Open open = new Open();
open.setContainerId(String.format("testContainer-%d", getConnectionId()));
- TransportFrame transportFrame = new TransportFrame((short) 0, open);
- sendPerformative(transportFrame);
+ sendPerformative(open, UnsignedShort.valueOf((short) 0));
PerformativeResponse response = (PerformativeResponse) getNextResponse();
if (!(response.getFrameBody() instanceof Open))
{
@@ -175,7 +206,7 @@ public class FrameTransport implements A
begin.setIncomingWindow(UnsignedInteger.ZERO);
begin.setOutgoingWindow(UnsignedInteger.ZERO);
_amqpChannelId = (short) 1;
- sendPerformative(new TransportFrame(_amqpChannelId, begin));
+ sendPerformative(begin, UnsignedShort.valueOf(_amqpChannelId));
PerformativeResponse response = (PerformativeResponse) getNextResponse();
if (!(response.getFrameBody() instanceof Begin))
{
@@ -206,6 +237,35 @@ public class FrameTransport implements A
assertThat(responseAttach.getSource(), is(notNullValue()));
}
+ public void doAttachSendingLink(final UnsignedInteger handle,
+ final String destination) throws Exception
+ {
+ doBeginSession();
+ Attach attach = new Attach();
+ attach.setName("testSendingLink");
+ attach.setHandle(handle);
+ attach.setRole(Role.SENDER);
+ attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ Source source = new Source();
+ attach.setSource(source);
+ Target target = new Target();
+ target.setAddress(destination);
+ attach.setTarget(target);
+
+ sendPerformative(attach);
+ PerformativeResponse response = (PerformativeResponse) getNextResponse();
+
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Attach.class)));
+ Attach responseAttach = (Attach) response.getFrameBody();
+ assertThat(responseAttach.getTarget(), is(notNullValue()));
+
+
+ PerformativeResponse flowResponse = (PerformativeResponse) getNextResponse();
+ assertThat(flowResponse, Is.is(CoreMatchers.notNullValue()));
+ assertThat(flowResponse.getFrameBody(), Is.is(CoreMatchers.instanceOf(Flow.class)));
+ }
+
public void assertNoMoreResponses() throws Exception
{
Response response = getNextResponse();
@@ -217,7 +277,7 @@ public class FrameTransport implements A
if (_amqpConnectionId == 0)
{
_amqpConnectionId = 1;
- while (!_amqpConnectionIds.add(_amqpConnectionId))
+ while (!AMQP_CONNECTION_IDS.add(_amqpConnectionId))
{
++_amqpConnectionId;
}
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java?rev=1791140&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java Wed Apr 12 16:26:40 2017
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.util.Arrays;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+
+public class Matchers
+{
+ public static Matcher<Response> protocolHeader(byte[] expectedHeader)
+ {
+ return new BaseMatcher<Response>()
+ {
+ @Override
+ public void describeTo(final Description description)
+ {
+ description.appendValue(new HeaderResponse(expectedHeader));
+ }
+
+ @Override
+ public boolean matches(final Object o)
+ {
+ if (o == null)
+ {
+ return false;
+ }
+ if (!(o instanceof HeaderResponse))
+ {
+ return false;
+ }
+ if (!Arrays.equals(expectedHeader, ((HeaderResponse) o).getHeader()))
+ {
+ return false;
+ }
+ return true;
+ }
+ };
+ }
+}
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java?rev=1791140&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java Wed Apr 12 16:26:40 2017
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.convertValue;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationPropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+
+public class MessageDecoder
+{
+ private static final AMQPDescribedTypeRegistry AMQP_DESCRIBED_TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer();
+ private static final SectionDecoderRegistry SECTION_DECODER_REGISTRY = AMQP_DESCRIBED_TYPE_REGISTRY
+ .getSectionDecoderRegistry();
+ private List<QpidByteBuffer> _fragments = new LinkedList<>();
+ private SectionDecoder _sectionDecoder = new SectionDecoderImpl(SECTION_DECODER_REGISTRY);
+ private HeaderSection _headerSection = null;
+ private PropertiesSection _propertiesSection = null;
+ private DeliveryAnnotationsSection _deliveryAnnotationsSection = null;
+ private MessageAnnotationsSection _messageAnnotationsSection = null;
+ private ApplicationPropertiesSection _applicationPropertiesSection = null;
+ private FooterSection _footerSection = null;
+ private List<EncodingRetainingSection<?>> _dataSections = new ArrayList<>();
+ private long _contentSize;
+ private boolean _parsed;
+
+ public void addTransfer(final Transfer transfer)
+ {
+ if (_parsed)
+ {
+ throw new IllegalStateException("The section fragments have already been parsed");
+ }
+ _fragments.addAll(transfer.getPayload());
+ }
+
+ public void parse() throws AmqpErrorException
+ {
+ if (!_parsed)
+ {
+ List<EncodingRetainingSection<?>> sections = _sectionDecoder.parseAll(_fragments);
+ Iterator<EncodingRetainingSection<?>> iter = sections.iterator();
+ EncodingRetainingSection<?> s = iter.hasNext() ? iter.next() : null;
+ if (s instanceof HeaderSection)
+ {
+ _headerSection = (HeaderSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s instanceof DeliveryAnnotationsSection)
+ {
+ _deliveryAnnotationsSection = (DeliveryAnnotationsSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s instanceof MessageAnnotationsSection)
+ {
+ _messageAnnotationsSection = (MessageAnnotationsSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s instanceof PropertiesSection)
+ {
+ _propertiesSection = (PropertiesSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s instanceof ApplicationPropertiesSection)
+ {
+ _applicationPropertiesSection = (ApplicationPropertiesSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s instanceof AmqpValueSection)
+ {
+ _contentSize = s.getEncodedSize();
+ _dataSections.add(s);
+ s = iter.hasNext() ? iter.next() : null;
+ }
+ else if (s instanceof DataSection)
+ {
+ do
+ {
+ _contentSize += s.getEncodedSize();
+ _dataSections.add(s);
+ s = iter.hasNext() ? iter.next() : null;
+ }
+ while (s instanceof DataSection);
+ }
+ else if (s instanceof AmqpSequenceSection)
+ {
+ do
+ {
+ _contentSize += s.getEncodedSize();
+ _dataSections.add(s);
+ s = iter.hasNext() ? iter.next() : null;
+ }
+ while (s instanceof AmqpSequenceSection);
+ }
+ else
+ {
+ throw new IllegalStateException("Application data sections are not found");
+ }
+
+ if (s instanceof FooterSection)
+ {
+ _footerSection = (FooterSection) s;
+ s = iter.hasNext() ? iter.next() : null;
+ }
+
+ if (s != null)
+ {
+ throw new IllegalStateException(String.format("Encountered unexpected section '%s'",
+ s.getClass().getSimpleName()));
+ }
+ _parsed = true;
+ }
+ }
+
+
+ public Object getData() throws AmqpErrorException
+ {
+ parse();
+
+
+ Object bodyObject = null;
+ EncodingRetainingSection<?> firstBodySection = _dataSections.get(0);
+ if(firstBodySection instanceof AmqpValueSection)
+ {
+ bodyObject = convertValue(firstBodySection.getValue());
+ }
+ else if(firstBodySection instanceof DataSection)
+ {
+ int totalSize = 0;
+ for(EncodingRetainingSection<?> section : _dataSections)
+ {
+ totalSize += ((DataSection)section).getValue().getArray().length;
+ }
+ byte[] bodyData = new byte[totalSize];
+ ByteBuffer buf = ByteBuffer.wrap(bodyData);
+ for(EncodingRetainingSection<?> section : _dataSections)
+ {
+ buf.put(((DataSection) section).getValue().asByteBuffer());
+ }
+ bodyObject = bodyData;
+ }
+ else
+ {
+ ArrayList<Object> totalSequence = new ArrayList<>();
+ for(EncodingRetainingSection<?> section : _dataSections)
+ {
+ totalSequence.addAll(((AmqpSequenceSection)section).getValue());
+ }
+ bodyObject = convertValue(totalSequence);
+ }
+ return bodyObject;
+ }
+}
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java?rev=1791140&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java Wed Apr 12 16:26:40 2017
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+
+public class MessageEncoder
+{
+ private static final AMQPDescribedTypeRegistry
+ AMQP_DESCRIBED_TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer();
+ private List<String> _data = new LinkedList<>();
+ private SectionEncoder _encoder = new SectionEncoderImpl(AMQP_DESCRIBED_TYPE_REGISTRY);
+
+ public void addData(final String data)
+ {
+ _data.add(data);
+ }
+
+ public List<QpidByteBuffer> getPayload()
+ {
+ List<QpidByteBuffer> payload = new ArrayList<>();
+ if (_data.isEmpty())
+ {
+ throw new IllegalStateException("Message should have at least one data section");
+ }
+
+ List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
+ if (_data.size() == 1)
+ {
+ AmqpValue amqpValue = new AmqpValue(_data.get(0));
+ dataSections.add(amqpValue.createEncodingRetainingSection(_encoder));
+ }
+ else
+ {
+ throw new UnsupportedOperationException("Unsupported yet");
+ }
+
+ for (EncodingRetainingSection<?> section: dataSections)
+ {
+ payload.addAll(section.getEncodedForm());
+ section.dispose();
+ }
+
+ return payload;
+ }
+}
Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java Wed Apr 12 16:26:40 2017
@@ -64,6 +64,7 @@ public class OutputHandler extends Chann
{
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes(msg.asByteBuffer());
+ msg.dispose();
try
{
OutputHandler.super.write(ctx, buffer, promise);
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java?rev=1791140&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java Wed Apr 12 16:26:40 2017
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.messaging;
+
+import static org.apache.qpid.tests.protocol.v1_0.Matchers.protocolHeader;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.HeaderResponse;
+import org.apache.qpid.tests.protocol.v1_0.Matchers;
+import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
+import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+import org.apache.qpid.tests.protocol.v1_0.Response;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+
+public class TransferTest extends ProtocolTestBase
+{
+ @Test
+ @SpecificationTest(section = "2.6.12",
+ description = "Transfering A Message.")
+ public void transfer() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr))
+ {
+ final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
+ transport.doAttachSendingLink(linkHandle, BrokerAdmin.TEST_QUEUE_NAME);
+
+ MessageEncoder messageEncoder = new MessageEncoder();
+ messageEncoder.addData("foo");
+
+ Transfer transfer = new Transfer();
+ transfer.setHandle(linkHandle);
+ transfer.setPayload(messageEncoder.getPayload());
+
+ transport.sendPerformative(transfer);
+ PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Disposition.class)));
+ Disposition responseDisposition = (Disposition) response.getFrameBody();
+ assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+ assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+ assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
+
+ transport.assertNoMoreResponses();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "", description = "Pipelined message send")
+ public void presettledPipelined() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr))
+ {
+ byte[] protocolHeader = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
+ Open open = new Open();
+ open.setContainerId("testContainerId");
+
+ Begin begin = new Begin();
+ begin.setNextOutgoingId(UnsignedInteger.ZERO);
+ begin.setIncomingWindow(UnsignedInteger.ZERO);
+ begin.setOutgoingWindow(UnsignedInteger.ZERO);
+
+ Attach attach = new Attach();
+ attach.setName("testLink");
+ final UnsignedInteger linkHandle = new UnsignedInteger(0);
+ attach.setHandle(linkHandle);
+ attach.setRole(Role.SENDER);
+ attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ Source source = new Source();
+ attach.setSource(source);
+ Target target = new Target();
+ attach.setTarget(target);
+
+ MessageEncoder messageEncoder = new MessageEncoder();
+ messageEncoder.addData("foo");
+ Transfer transfer = new Transfer();
+ transfer.setHandle(linkHandle);
+ transfer.setPayload(messageEncoder.getPayload());
+ transfer.setSettled(Boolean.TRUE);
+
+ Close close = new Close();
+
+ final short channel = (short) 37;
+ final ListenableFuture<Void> future = transport.sendPipelined(protocolHeader,
+ new TransportFrame((short) 0, open),
+ new TransportFrame(channel, begin),
+ new TransportFrame(channel, attach),
+ new TransportFrame(channel, transfer, transfer.getPayload()),
+ new TransportFrame((short) 0, close));
+ future.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+
+ final Response response = transport.getNextResponse();
+ assertThat(response, is(protocolHeader(protocolHeader)));
+
+ final PerformativeResponse openResponse = (PerformativeResponse) transport.getNextResponse();
+ assertThat(openResponse, is(notNullValue()));
+ assertThat(openResponse.getFrameBody(), is(instanceOf(Open.class)));
+ final PerformativeResponse beginResponse = (PerformativeResponse) transport.getNextResponse();
+ assertThat(beginResponse, is(notNullValue()));
+ assertThat(beginResponse.getFrameBody(), is(instanceOf(Begin.class)));
+ final PerformativeResponse attachResponse = (PerformativeResponse) transport.getNextResponse();
+ assertThat(attachResponse, is(notNullValue()));
+ assertThat(attachResponse.getFrameBody(), is(instanceOf(Attach.class)));
+ final PerformativeResponse flowResponse = (PerformativeResponse) transport.getNextResponse();
+ assertThat(flowResponse, is(notNullValue()));
+ assertThat(flowResponse.getFrameBody(), is(instanceOf(Flow.class)));
+/*
+ final PerformativeResponse dispositionResponse = (PerformativeResponse) transport.getNextResponse();
+ assertThat(dispositionResponse, is(notNullValue()));
+ assertThat(dispositionResponse.getFrameBody(), is(instanceOf(Disposition.class)));
+ final PerformativeResponse detachResponse = (PerformativeResponse) transport.getNextResponse();
+ assertThat(detachResponse, is(notNullValue()));
+ assertThat(detachResponse.getFrameBody(), is(instanceOf(Detach.class)));
+ final PerformativeResponse endResponse = (PerformativeResponse) transport.getNextResponse();
+ assertThat(endResponse, is(notNullValue()));
+ assertThat(endResponse.getFrameBody(), is(instanceOf(End.class)));
+*/
+ final PerformativeResponse closeResponse = (PerformativeResponse) transport.getNextResponse();
+ assertThat(closeResponse, is(notNullValue()));
+ assertThat(closeResponse.getFrameBody(), is(instanceOf(Close.class)));
+ transport.assertNoMoreResponses();
+ }
+ }
+}
Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java Wed Apr 12 16:26:40 2017
@@ -59,7 +59,7 @@ public class ProtocolHeaderTest extends
public void successfulHeaderExchange() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try(FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr))
{
byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
transport.sendProtocolHeader(bytes);
Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java Wed Apr 12 16:26:40 2017
@@ -32,8 +32,6 @@ import static org.hamcrest.Matchers.notN
import java.net.InetSocketAddress;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
@@ -60,8 +58,7 @@ public class OpenTest extends ProtocolTe
{
transport.doProtocolNegotiation();
Open open = new Open();
- TransportFrame transportFrame = new TransportFrame((short) 0, open);
- transport.sendPerformative(transportFrame);
+ transport.sendPerformative(open, UnsignedShort.valueOf((short) 0));
PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
assertThat(response, is(notNullValue()));
@@ -84,8 +81,7 @@ public class OpenTest extends ProtocolTe
transport.doProtocolNegotiation();
Open open = new Open();
open.setContainerId("testContainerId");
- TransportFrame transportFrame = new TransportFrame((short) 0, open);
- transport.sendPerformative(transportFrame);
+ transport.sendPerformative(open, UnsignedShort.valueOf((short) 0));
PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
assertThat(response, is(notNullValue()));
@@ -110,8 +106,7 @@ public class OpenTest extends ProtocolTe
transport.doProtocolNegotiation();
Open open = new Open();
open.setContainerId("testContainerId");
- TransportFrame transportFrame = new TransportFrame((short) 1, open);
- transport.sendPerformative(transportFrame);
+ transport.sendPerformative(open, UnsignedShort.valueOf((short) 1));
PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
assertThat(response, is(notNullValue()));
Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java Wed Apr 12 16:26:40 2017
@@ -77,12 +77,11 @@ public class AttachTest extends Protocol
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr))
{
- Role localRole = Role.SENDER;
transport.doBeginSession();
Attach attach = new Attach();
attach.setName("testLink");
attach.setHandle(new UnsignedInteger(0));
- attach.setRole(localRole);
+ attach.setRole(Role.SENDER);
attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
Source source = new Source();
attach.setSource(source);
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java?rev=1791140&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java Wed Apr 12 16:26:40 2017
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.transport.link;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
+import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+
+public class FlowTest extends ProtocolTestBase
+{
+ @Test
+ @SpecificationTest(section = "1.3.4",
+ description = "Flow without mandatory fields should result in a decoding error.")
+ public void emptyFlow() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr))
+ {
+ transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME);
+ Flow flow = new Flow();
+
+ transport.sendPerformative(flow);
+ PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Close.class)));
+ Close responseClose = (Close) response.getFrameBody();
+ assertThat(responseClose.getError(), is(notNullValue()));
+ assertThat(responseClose.getError().getCondition(), is(AmqpError.DECODE_ERROR));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.7.4",
+ description = "If set to true then the receiver SHOULD send its state at the earliest convenient opportunity.")
+ public void echoFlow() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr))
+ {
+ transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME);
+ Flow flow = new Flow();
+ flow.setIncomingWindow(UnsignedInteger.ZERO);
+ flow.setNextIncomingId(UnsignedInteger.ZERO);
+ flow.setOutgoingWindow(UnsignedInteger.ZERO);
+ flow.setNextOutgoingId(UnsignedInteger.ZERO);
+ flow.setEcho(Boolean.TRUE);
+
+ transport.sendPerformative(flow);
+ PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Flow.class)));
+ Flow responseFlow = (Flow) response.getFrameBody();
+ assertThat(responseFlow.getEcho(), is(equalTo(Boolean.FALSE)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.6.8",
+ description = "A synchronous get of a message from a link is accomplished by incrementing the link-credit,"
+ + " sending the updated flow state, and waiting indefinitely for a transfer to arrive.")
+ public void synchronousGet() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "foo");
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr))
+ {
+ transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME);
+ Flow flow = new Flow();
+ flow.setIncomingWindow(UnsignedInteger.ONE);
+ flow.setNextIncomingId(UnsignedInteger.ZERO);
+ flow.setOutgoingWindow(UnsignedInteger.ZERO);
+ flow.setNextOutgoingId(UnsignedInteger.ZERO);
+ flow.setHandle(UnsignedInteger.ZERO); // TODO
+ flow.setLinkCredit(UnsignedInteger.ONE);
+
+ transport.sendPerformative(flow);
+
+ MessageDecoder messageDecoder = new MessageDecoder();
+ boolean hasMore;
+ do
+ {
+ PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Transfer.class)));
+ Transfer responseTransfer = (Transfer) response.getFrameBody();
+ messageDecoder.addTransfer(responseTransfer);
+ hasMore = Boolean.TRUE.equals(responseTransfer.getMore());
+ }
+ while (hasMore);
+
+ String data = (String) messageDecoder.getData();
+ assertThat(data, is(equalTo("foo")));
+ }
+ }
+}
Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java Wed Apr 12 16:26:40 2017
@@ -52,9 +52,7 @@ public class BeginTest extends ProtocolT
{
transport.doOpenConnection();
Begin begin = new Begin();
- short channel = (short) 37;
- TransportFrame transportFrame = new TransportFrame(channel, begin);
- transport.sendPerformative(transportFrame);
+ transport.sendPerformative(begin, UnsignedShort.valueOf((short) 37));
PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
assertThat(response, is(notNullValue()));
@@ -81,8 +79,7 @@ public class BeginTest extends ProtocolT
begin.setOutgoingWindow(UnsignedInteger.ZERO);
UnsignedShort channel = UnsignedShort.valueOf((short) 37);
- TransportFrame transportFrame = new TransportFrame(channel.shortValue(), begin);
- transport.sendPerformative(transportFrame);
+ transport.sendPerformative(begin, channel);
PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
assertThat(response, is(notNullValue()));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org