You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/04/12 16:26:40 UTC

svn commit: r1791140 - in /qpid/java/trunk/systests/protocol-tests-amqp-1-0/src: main/java/org/apache/qpid/tests/protocol/v1_0/ test/java/org/apache/qpid/tests/protocol/v1_0/messaging/ test/java/org/apache/qpid/tests/protocol/v1_0/transport/ test/java/...

Author: lquack
Date: Wed Apr 12 16:26:40 2017
New Revision: 1791140

URL: http://svn.apache.org/viewvc?rev=1791140&view=rev
Log:
QPID-7665: [Java Broker] Improvements to AMQP 1.0 Protocol Tests

 * add FlowTest and TransferTest

Added:
    qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
    qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
    qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
    qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/
    qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
    qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
Modified:
    qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
    qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java
    qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java
    qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
    qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
    qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
    qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
    qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
    qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java

Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java Wed Apr 12 16:26:40 2017
@@ -26,6 +26,8 @@ import org.apache.qpid.server.plugin.Plu
 
 public interface BrokerAdmin extends Pluggable
 {
+    String TEST_QUEUE_NAME = "testQueue";
+
     void beforeTestClass(final Class testClass);
     void beforeTestMethod(final Class testClass, final Method method);
     void afterTestMethod(final Class testClass, final Method method);
@@ -35,6 +37,7 @@ public interface BrokerAdmin extends Plu
 
     void createQueue(String queueName);
     void deleteQueue(String queueName);
+    void putMessageOnQueue(String queueName, String... messages);
 
     enum PortType
     {

Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java Wed Apr 12 16:26:40 2017
@@ -43,6 +43,7 @@ import org.apache.qpid.server.logging.lo
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Container;
 import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.ManageableMessage;
 import org.apache.qpid.server.model.NotFoundException;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Queue;
@@ -51,6 +52,7 @@ import org.apache.qpid.server.model.Virt
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.store.MemoryConfigurationStore;
 import org.apache.qpid.server.util.FileUtils;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.server.virtualhostnode.JsonVirtualHostNode;
 
 @PluggableService
@@ -144,6 +146,11 @@ public class EmbeddedBrokerPerClassAdmin
         {
             _currentVirtualHostNode.delete();
         }
+        else
+        {
+            _currentVirtualHostNode.setAttributes(Collections.singletonMap(VirtualHostNode.DEFAULT_VIRTUAL_HOST_NODE,
+                                                                           false));
+        }
     }
 
     @Override
@@ -185,6 +192,101 @@ public class EmbeddedBrokerPerClassAdmin
     }
 
     @Override
+    public void putMessageOnQueue(final String queueName, final String... messages)
+    {
+        for (String message : messages)
+        {
+            ((QueueManagingVirtualHost<?>) _currentVirtualHostNode.getVirtualHost()).publishMessage(new ManageableMessage()
+            {
+                @Override
+                public String getAddress()
+                {
+                    return queueName;
+                }
+
+                @Override
+                public boolean isPersistent()
+                {
+                    return false;
+                }
+
+                @Override
+                public Date getExpiration()
+                {
+                    return null;
+                }
+
+                @Override
+                public String getCorrelationId()
+                {
+                    return null;
+                }
+
+                @Override
+                public String getAppId()
+                {
+                    return null;
+                }
+
+                @Override
+                public String getMessageId()
+                {
+                    return null;
+                }
+
+                @Override
+                public String getMimeType()
+                {
+                    return "text/plain";
+                }
+
+                @Override
+                public String getEncoding()
+                {
+                    return null;
+                }
+
+                @Override
+                public int getPriority()
+                {
+                    return 0;
+                }
+
+                @Override
+                public Date getNotValidBefore()
+                {
+                    return null;
+                }
+
+                @Override
+                public String getReplyTo()
+                {
+                    return null;
+                }
+
+                @Override
+                public Map<String, Object> getHeaders()
+                {
+                    return null;
+                }
+
+                @Override
+                public Object getContent()
+                {
+                    return message;
+                }
+
+                @Override
+                public String getContentTransferEncoding()
+                {
+                    return null;
+                }
+            });
+        }
+
+    }
+
+    @Override
     public String getType()
     {
         return "EMBEDDED_BROKER_PER_CLASS";

Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java Wed Apr 12 16:26:40 2017
@@ -92,6 +92,12 @@ public class ExternalQpidBrokerAdminImpl
     }
 
     @Override
+    public void putMessageOnQueue(final String queueName, final String... messages)
+    {
+        LOGGER.debug(String.format("puting of %d messages on queue '%s' requested", messages.length, queueName));
+    }
+
+    @Override
     public String getType()
     {
         return "EXTERNAL_BROKER";

Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java Wed Apr 12 16:26:40 2017
@@ -29,6 +29,7 @@ import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -44,25 +45,32 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.core.Is;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
 public class FrameTransport implements AutoCloseable
 {
-    private static final long RESPONSE_TIMEOUT = 6000;
-    private static final Set<Integer> _amqpConnectionIds = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    private static final Set<Integer> AMQP_CONNECTION_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    public static final long RESPONSE_TIMEOUT = 6000;
 
     private final Channel _channel;
     private final BlockingQueue<Response> _queue = new ArrayBlockingQueue<>(100);
@@ -108,7 +116,7 @@ public class FrameTransport implements A
         }
         finally
         {
-            _amqpConnectionIds.remove(_amqpConnectionId);
+            AMQP_CONNECTION_IDS.remove(_amqpConnectionId);
             _workerGroup.shutdownGracefully();
         }
     }
@@ -122,8 +130,10 @@ public class FrameTransport implements A
         return JdkFutureAdapters.listenInPoolThread(channelFuture);
     }
 
-    public ListenableFuture<Void> sendPerformative(final TransportFrame transportFrame) throws Exception
+    public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception
     {
+        final List<QpidByteBuffer> payload = frameBody instanceof Transfer ? ((Transfer)frameBody).getPayload() : null;
+        TransportFrame transportFrame = new TransportFrame(channel.shortValue(), frameBody, payload);
         ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
         channelFuture.sync();
         return JdkFutureAdapters.listenInPoolThread(channelFuture);
@@ -131,8 +141,30 @@ public class FrameTransport implements A
 
     public ListenableFuture<Void> sendPerformative(final FrameBody frameBody) throws Exception
     {
-        TransportFrame transportFrame = new TransportFrame(_amqpChannelId, frameBody);
-        return sendPerformative(transportFrame);
+        return sendPerformative(frameBody, UnsignedShort.valueOf(_amqpChannelId));
+    }
+
+    public ListenableFuture<Void> sendPipelined(final byte[] protocolHeader, final TransportFrame... frames)
+            throws InterruptedException
+    {
+        ChannelPromise promise = _channel.newPromise();
+        if (protocolHeader != null)
+        {
+            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+            buffer.writeBytes(protocolHeader);
+            _channel.write(buffer);
+        }
+        for (TransportFrame frame : frames)
+        {
+            _channel.write(frame, promise);
+        }
+        _channel.flush();
+        return JdkFutureAdapters.listenInPoolThread(promise);
+    }
+
+    public ListenableFuture<Void> sendPipelined(final TransportFrame... frames) throws InterruptedException
+    {
+        return sendPipelined(null, frames);
     }
 
     public Response getNextResponse() throws Exception
@@ -158,8 +190,7 @@ public class FrameTransport implements A
         Open open = new Open();
 
         open.setContainerId(String.format("testContainer-%d", getConnectionId()));
-        TransportFrame transportFrame = new TransportFrame((short) 0, open);
-        sendPerformative(transportFrame);
+        sendPerformative(open, UnsignedShort.valueOf((short) 0));
         PerformativeResponse response = (PerformativeResponse) getNextResponse();
         if (!(response.getFrameBody() instanceof Open))
         {
@@ -175,7 +206,7 @@ public class FrameTransport implements A
         begin.setIncomingWindow(UnsignedInteger.ZERO);
         begin.setOutgoingWindow(UnsignedInteger.ZERO);
         _amqpChannelId = (short) 1;
-        sendPerformative(new TransportFrame(_amqpChannelId, begin));
+        sendPerformative(begin, UnsignedShort.valueOf(_amqpChannelId));
         PerformativeResponse response = (PerformativeResponse) getNextResponse();
         if (!(response.getFrameBody() instanceof Begin))
         {
@@ -206,6 +237,35 @@ public class FrameTransport implements A
         assertThat(responseAttach.getSource(), is(notNullValue()));
     }
 
+    public void doAttachSendingLink(final UnsignedInteger handle,
+                                    final String destination) throws Exception
+    {
+        doBeginSession();
+        Attach attach = new Attach();
+        attach.setName("testSendingLink");
+        attach.setHandle(handle);
+        attach.setRole(Role.SENDER);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        Source source = new Source();
+        attach.setSource(source);
+        Target target = new Target();
+        target.setAddress(destination);
+        attach.setTarget(target);
+
+        sendPerformative(attach);
+        PerformativeResponse response = (PerformativeResponse) getNextResponse();
+
+        assertThat(response, is(notNullValue()));
+        assertThat(response.getFrameBody(), is(instanceOf(Attach.class)));
+        Attach responseAttach = (Attach) response.getFrameBody();
+        assertThat(responseAttach.getTarget(), is(notNullValue()));
+
+
+        PerformativeResponse flowResponse = (PerformativeResponse) getNextResponse();
+        assertThat(flowResponse, Is.is(CoreMatchers.notNullValue()));
+        assertThat(flowResponse.getFrameBody(), Is.is(CoreMatchers.instanceOf(Flow.class)));
+    }
+
     public void assertNoMoreResponses() throws Exception
     {
         Response response = getNextResponse();
@@ -217,7 +277,7 @@ public class FrameTransport implements A
         if (_amqpConnectionId == 0)
         {
             _amqpConnectionId = 1;
-            while (!_amqpConnectionIds.add(_amqpConnectionId))
+            while (!AMQP_CONNECTION_IDS.add(_amqpConnectionId))
             {
                 ++_amqpConnectionId;
             }

Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java?rev=1791140&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java Wed Apr 12 16:26:40 2017
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.util.Arrays;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+
+public class Matchers
+{
+    public static Matcher<Response> protocolHeader(byte[] expectedHeader)
+    {
+        return new BaseMatcher<Response>()
+        {
+            @Override
+            public void describeTo(final Description description)
+            {
+                description.appendValue(new HeaderResponse(expectedHeader));
+            }
+
+            @Override
+            public boolean matches(final Object o)
+            {
+                if (o == null)
+                {
+                    return false;
+                }
+                if (!(o instanceof HeaderResponse))
+                {
+                    return false;
+                }
+                if (!Arrays.equals(expectedHeader, ((HeaderResponse) o).getHeader()))
+                {
+                    return false;
+                }
+                return true;
+            }
+        };
+    }
+}

Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java?rev=1791140&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java Wed Apr 12 16:26:40 2017
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.convertValue;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationPropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+
+public class MessageDecoder
+{
+    private static final AMQPDescribedTypeRegistry AMQP_DESCRIBED_TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+                                                                                                          .registerTransportLayer()
+                                                                                                          .registerMessagingLayer();
+    private static final SectionDecoderRegistry SECTION_DECODER_REGISTRY = AMQP_DESCRIBED_TYPE_REGISTRY
+                                                                                                    .getSectionDecoderRegistry();
+    private List<QpidByteBuffer> _fragments = new LinkedList<>();
+    private SectionDecoder _sectionDecoder = new SectionDecoderImpl(SECTION_DECODER_REGISTRY);
+    private HeaderSection _headerSection = null;
+    private PropertiesSection _propertiesSection = null;
+    private DeliveryAnnotationsSection _deliveryAnnotationsSection = null;
+    private MessageAnnotationsSection _messageAnnotationsSection = null;
+    private ApplicationPropertiesSection _applicationPropertiesSection = null;
+    private FooterSection _footerSection = null;
+    private List<EncodingRetainingSection<?>> _dataSections = new ArrayList<>();
+    private long _contentSize;
+    private boolean _parsed;
+
+    public void addTransfer(final Transfer transfer)
+    {
+        if (_parsed)
+        {
+            throw new IllegalStateException("The section fragments have already been parsed");
+        }
+        _fragments.addAll(transfer.getPayload());
+    }
+
+    public void parse() throws AmqpErrorException
+    {
+        if (!_parsed)
+        {
+            List<EncodingRetainingSection<?>> sections = _sectionDecoder.parseAll(_fragments);
+            Iterator<EncodingRetainingSection<?>> iter = sections.iterator();
+            EncodingRetainingSection<?> s = iter.hasNext() ? iter.next() : null;
+            if (s instanceof HeaderSection)
+            {
+                _headerSection = (HeaderSection) s;
+                s = iter.hasNext() ? iter.next() : null;
+            }
+
+            if (s instanceof DeliveryAnnotationsSection)
+            {
+                _deliveryAnnotationsSection = (DeliveryAnnotationsSection) s;
+                s = iter.hasNext() ? iter.next() : null;
+            }
+
+            if (s instanceof MessageAnnotationsSection)
+            {
+                _messageAnnotationsSection = (MessageAnnotationsSection) s;
+                s = iter.hasNext() ? iter.next() : null;
+            }
+
+            if (s instanceof PropertiesSection)
+            {
+                _propertiesSection = (PropertiesSection) s;
+                s = iter.hasNext() ? iter.next() : null;
+            }
+
+            if (s instanceof ApplicationPropertiesSection)
+            {
+                _applicationPropertiesSection = (ApplicationPropertiesSection) s;
+                s = iter.hasNext() ? iter.next() : null;
+            }
+
+            if (s instanceof AmqpValueSection)
+            {
+                _contentSize = s.getEncodedSize();
+                _dataSections.add(s);
+                s = iter.hasNext() ? iter.next() : null;
+            }
+            else if (s instanceof DataSection)
+            {
+                do
+                {
+                    _contentSize += s.getEncodedSize();
+                    _dataSections.add(s);
+                    s = iter.hasNext() ? iter.next() : null;
+                }
+                while (s instanceof DataSection);
+            }
+            else if (s instanceof AmqpSequenceSection)
+            {
+                do
+                {
+                    _contentSize += s.getEncodedSize();
+                    _dataSections.add(s);
+                    s = iter.hasNext() ? iter.next() : null;
+                }
+                while (s instanceof AmqpSequenceSection);
+            }
+            else
+            {
+                throw new IllegalStateException("Application data sections are not found");
+            }
+
+            if (s instanceof FooterSection)
+            {
+                _footerSection = (FooterSection) s;
+                s = iter.hasNext() ? iter.next() : null;
+            }
+
+            if (s != null)
+            {
+                throw new IllegalStateException(String.format("Encountered unexpected section '%s'",
+                                                              s.getClass().getSimpleName()));
+            }
+            _parsed = true;
+        }
+    }
+
+
+    public Object getData() throws AmqpErrorException
+    {
+        parse();
+
+
+        Object bodyObject = null;
+        EncodingRetainingSection<?> firstBodySection = _dataSections.get(0);
+        if(firstBodySection instanceof AmqpValueSection)
+        {
+            bodyObject = convertValue(firstBodySection.getValue());
+        }
+        else if(firstBodySection instanceof DataSection)
+        {
+            int totalSize = 0;
+            for(EncodingRetainingSection<?> section : _dataSections)
+            {
+                totalSize += ((DataSection)section).getValue().getArray().length;
+            }
+            byte[] bodyData = new byte[totalSize];
+            ByteBuffer buf = ByteBuffer.wrap(bodyData);
+            for(EncodingRetainingSection<?> section : _dataSections)
+            {
+                buf.put(((DataSection) section).getValue().asByteBuffer());
+            }
+            bodyObject = bodyData;
+        }
+        else
+        {
+            ArrayList<Object> totalSequence = new ArrayList<>();
+            for(EncodingRetainingSection<?> section : _dataSections)
+            {
+                totalSequence.addAll(((AmqpSequenceSection)section).getValue());
+            }
+            bodyObject = convertValue(totalSequence);
+        }
+        return bodyObject;
+    }
+}

Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java?rev=1791140&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java Wed Apr 12 16:26:40 2017
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+
+public class MessageEncoder
+{
+    private static final AMQPDescribedTypeRegistry
+            AMQP_DESCRIBED_TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+                                                                    .registerTransportLayer()
+                                                                    .registerMessagingLayer();
+    private List<String> _data = new LinkedList<>();
+    private SectionEncoder _encoder = new SectionEncoderImpl(AMQP_DESCRIBED_TYPE_REGISTRY);
+
+    public void addData(final String data)
+    {
+        _data.add(data);
+    }
+
+    public List<QpidByteBuffer> getPayload()
+    {
+        List<QpidByteBuffer> payload = new ArrayList<>();
+        if (_data.isEmpty())
+        {
+            throw new IllegalStateException("Message should have at least one data section");
+        }
+
+        List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
+        if (_data.size() == 1)
+        {
+            AmqpValue amqpValue = new AmqpValue(_data.get(0));
+            dataSections.add(amqpValue.createEncodingRetainingSection(_encoder));
+        }
+        else
+        {
+            throw new UnsupportedOperationException("Unsupported yet");
+        }
+
+        for (EncodingRetainingSection<?> section: dataSections)
+        {
+            payload.addAll(section.getEncodedForm());
+            section.dispose();
+        }
+
+        return payload;
+    }
+}

Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java Wed Apr 12 16:26:40 2017
@@ -64,6 +64,7 @@ public class OutputHandler extends Chann
                 {
                     ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
                     buffer.writeBytes(msg.asByteBuffer());
+                    msg.dispose();
                     try
                     {
                         OutputHandler.super.write(ctx, buffer, promise);

Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java?rev=1791140&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java Wed Apr 12 16:26:40 2017
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.messaging;
+
+import static org.apache.qpid.tests.protocol.v1_0.Matchers.protocolHeader;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.HeaderResponse;
+import org.apache.qpid.tests.protocol.v1_0.Matchers;
+import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
+import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+import org.apache.qpid.tests.protocol.v1_0.Response;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+
+public class TransferTest extends ProtocolTestBase
+{
+    @Test
+    @SpecificationTest(section = "2.6.12",
+            description = "Transfering A Message.")
+    public void transfer() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr))
+        {
+            final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
+            transport.doAttachSendingLink(linkHandle, BrokerAdmin.TEST_QUEUE_NAME);
+
+            MessageEncoder messageEncoder = new MessageEncoder();
+            messageEncoder.addData("foo");
+
+            Transfer transfer = new Transfer();
+            transfer.setHandle(linkHandle);
+            transfer.setPayload(messageEncoder.getPayload());
+
+            transport.sendPerformative(transfer);
+            PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getFrameBody(), is(instanceOf(Disposition.class)));
+            Disposition responseDisposition = (Disposition) response.getFrameBody();
+            assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+            assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+            assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
+
+            transport.assertNoMoreResponses();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "", description = "Pipelined message send")
+    public void presettledPipelined() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr))
+        {
+            byte[] protocolHeader = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
+            Open open = new Open();
+            open.setContainerId("testContainerId");
+
+            Begin begin = new Begin();
+            begin.setNextOutgoingId(UnsignedInteger.ZERO);
+            begin.setIncomingWindow(UnsignedInteger.ZERO);
+            begin.setOutgoingWindow(UnsignedInteger.ZERO);
+
+            Attach attach = new Attach();
+            attach.setName("testLink");
+            final UnsignedInteger linkHandle = new UnsignedInteger(0);
+            attach.setHandle(linkHandle);
+            attach.setRole(Role.SENDER);
+            attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+            Source source = new Source();
+            attach.setSource(source);
+            Target target = new Target();
+            attach.setTarget(target);
+
+            MessageEncoder messageEncoder = new MessageEncoder();
+            messageEncoder.addData("foo");
+            Transfer transfer = new Transfer();
+            transfer.setHandle(linkHandle);
+            transfer.setPayload(messageEncoder.getPayload());
+            transfer.setSettled(Boolean.TRUE);
+
+            Close close = new Close();
+
+            final short channel = (short) 37;
+            final ListenableFuture<Void> future = transport.sendPipelined(protocolHeader,
+                                                                          new TransportFrame((short) 0, open),
+                                                                          new TransportFrame(channel, begin),
+                                                                          new TransportFrame(channel, attach),
+                                                                          new TransportFrame(channel, transfer, transfer.getPayload()),
+                                                                          new TransportFrame((short) 0, close));
+            future.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+
+            final Response response = transport.getNextResponse();
+            assertThat(response, is(protocolHeader(protocolHeader)));
+
+            final PerformativeResponse openResponse = (PerformativeResponse) transport.getNextResponse();
+            assertThat(openResponse, is(notNullValue()));
+            assertThat(openResponse.getFrameBody(), is(instanceOf(Open.class)));
+            final PerformativeResponse beginResponse = (PerformativeResponse) transport.getNextResponse();
+            assertThat(beginResponse, is(notNullValue()));
+            assertThat(beginResponse.getFrameBody(), is(instanceOf(Begin.class)));
+            final PerformativeResponse attachResponse = (PerformativeResponse) transport.getNextResponse();
+            assertThat(attachResponse, is(notNullValue()));
+            assertThat(attachResponse.getFrameBody(), is(instanceOf(Attach.class)));
+            final PerformativeResponse flowResponse = (PerformativeResponse) transport.getNextResponse();
+            assertThat(flowResponse, is(notNullValue()));
+            assertThat(flowResponse.getFrameBody(), is(instanceOf(Flow.class)));
+/*
+            final PerformativeResponse dispositionResponse = (PerformativeResponse) transport.getNextResponse();
+            assertThat(dispositionResponse, is(notNullValue()));
+            assertThat(dispositionResponse.getFrameBody(), is(instanceOf(Disposition.class)));
+            final PerformativeResponse detachResponse = (PerformativeResponse) transport.getNextResponse();
+            assertThat(detachResponse, is(notNullValue()));
+            assertThat(detachResponse.getFrameBody(), is(instanceOf(Detach.class)));
+            final PerformativeResponse endResponse = (PerformativeResponse) transport.getNextResponse();
+            assertThat(endResponse, is(notNullValue()));
+            assertThat(endResponse.getFrameBody(), is(instanceOf(End.class)));
+*/
+            final PerformativeResponse closeResponse = (PerformativeResponse) transport.getNextResponse();
+            assertThat(closeResponse, is(notNullValue()));
+            assertThat(closeResponse.getFrameBody(), is(instanceOf(Close.class)));
+            transport.assertNoMoreResponses();
+        }
+    }
+}

Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java Wed Apr 12 16:26:40 2017
@@ -59,7 +59,7 @@ public class ProtocolHeaderTest extends
     public void successfulHeaderExchange() throws Exception
     {
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
-        try(FrameTransport transport = new FrameTransport(addr))
+        try (FrameTransport transport = new FrameTransport(addr))
         {
             byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
             transport.sendProtocolHeader(bytes);

Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java Wed Apr 12 16:26:40 2017
@@ -32,8 +32,6 @@ import static org.hamcrest.Matchers.notN
 import java.net.InetSocketAddress;
 
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
@@ -60,8 +58,7 @@ public class OpenTest extends ProtocolTe
         {
             transport.doProtocolNegotiation();
             Open open = new Open();
-            TransportFrame transportFrame = new TransportFrame((short) 0, open);
-            transport.sendPerformative(transportFrame);
+            transport.sendPerformative(open, UnsignedShort.valueOf((short) 0));
             PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
 
             assertThat(response, is(notNullValue()));
@@ -84,8 +81,7 @@ public class OpenTest extends ProtocolTe
             transport.doProtocolNegotiation();
             Open open = new Open();
             open.setContainerId("testContainerId");
-            TransportFrame transportFrame = new TransportFrame((short) 0, open);
-            transport.sendPerformative(transportFrame);
+            transport.sendPerformative(open, UnsignedShort.valueOf((short) 0));
             PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
 
             assertThat(response, is(notNullValue()));
@@ -110,8 +106,7 @@ public class OpenTest extends ProtocolTe
             transport.doProtocolNegotiation();
             Open open = new Open();
             open.setContainerId("testContainerId");
-            TransportFrame transportFrame = new TransportFrame((short) 1, open);
-            transport.sendPerformative(transportFrame);
+            transport.sendPerformative(open, UnsignedShort.valueOf((short) 1));
             PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
 
             assertThat(response, is(notNullValue()));

Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java Wed Apr 12 16:26:40 2017
@@ -77,12 +77,11 @@ public class AttachTest extends Protocol
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
         try (FrameTransport transport = new FrameTransport(addr))
         {
-            Role localRole = Role.SENDER;
             transport.doBeginSession();
             Attach attach = new Attach();
             attach.setName("testLink");
             attach.setHandle(new UnsignedInteger(0));
-            attach.setRole(localRole);
+            attach.setRole(Role.SENDER);
             attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
             Source source = new Source();
             attach.setSource(source);

Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java?rev=1791140&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java Wed Apr 12 16:26:40 2017
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.transport.link;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
+import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+
+public class FlowTest extends ProtocolTestBase
+{
+    @Test
+    @SpecificationTest(section = "1.3.4",
+            description = "Flow without mandatory fields should result in a decoding error.")
+    public void emptyFlow() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr))
+        {
+            transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME);
+            Flow flow = new Flow();
+
+            transport.sendPerformative(flow);
+            PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getFrameBody(), is(instanceOf(Close.class)));
+            Close responseClose = (Close) response.getFrameBody();
+            assertThat(responseClose.getError(), is(notNullValue()));
+            assertThat(responseClose.getError().getCondition(), is(AmqpError.DECODE_ERROR));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.7.4",
+            description = "If set to true then the receiver SHOULD send its state at the earliest convenient opportunity.")
+    public void echoFlow() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr))
+        {
+            transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME);
+            Flow flow = new Flow();
+            flow.setIncomingWindow(UnsignedInteger.ZERO);
+            flow.setNextIncomingId(UnsignedInteger.ZERO);
+            flow.setOutgoingWindow(UnsignedInteger.ZERO);
+            flow.setNextOutgoingId(UnsignedInteger.ZERO);
+            flow.setEcho(Boolean.TRUE);
+
+            transport.sendPerformative(flow);
+            PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getFrameBody(), is(instanceOf(Flow.class)));
+            Flow responseFlow = (Flow) response.getFrameBody();
+            assertThat(responseFlow.getEcho(), is(equalTo(Boolean.FALSE)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.6.8",
+            description = "A synchronous get of a message from a link is accomplished by incrementing the link-credit,"
+                          + " sending the updated flow state, and waiting indefinitely for a transfer to arrive.")
+    public void synchronousGet() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "foo");
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr))
+        {
+            transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME);
+            Flow flow = new Flow();
+            flow.setIncomingWindow(UnsignedInteger.ONE);
+            flow.setNextIncomingId(UnsignedInteger.ZERO);
+            flow.setOutgoingWindow(UnsignedInteger.ZERO);
+            flow.setNextOutgoingId(UnsignedInteger.ZERO);
+            flow.setHandle(UnsignedInteger.ZERO); // TODO
+            flow.setLinkCredit(UnsignedInteger.ONE);
+
+            transport.sendPerformative(flow);
+
+            MessageDecoder messageDecoder = new MessageDecoder();
+            boolean hasMore;
+            do
+            {
+                PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+                assertThat(response, is(notNullValue()));
+                assertThat(response.getFrameBody(), is(instanceOf(Transfer.class)));
+                Transfer responseTransfer = (Transfer) response.getFrameBody();
+                messageDecoder.addTransfer(responseTransfer);
+                hasMore = Boolean.TRUE.equals(responseTransfer.getMore());
+            }
+            while (hasMore);
+
+            String data = (String) messageDecoder.getData();
+            assertThat(data, is(equalTo("foo")));
+        }
+    }
+}

Modified: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java?rev=1791140&r1=1791139&r2=1791140&view=diff
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java Wed Apr 12 16:26:40 2017
@@ -52,9 +52,7 @@ public class BeginTest extends ProtocolT
         {
             transport.doOpenConnection();
             Begin begin = new Begin();
-            short channel = (short) 37;
-            TransportFrame transportFrame = new TransportFrame(channel, begin);
-            transport.sendPerformative(transportFrame);
+            transport.sendPerformative(begin, UnsignedShort.valueOf((short) 37));
             PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
 
             assertThat(response, is(notNullValue()));
@@ -81,8 +79,7 @@ public class BeginTest extends ProtocolT
             begin.setOutgoingWindow(UnsignedInteger.ZERO);
 
             UnsignedShort channel = UnsignedShort.valueOf((short) 37);
-            TransportFrame transportFrame = new TransportFrame(channel.shortValue(), begin);
-            transport.sendPerformative(transportFrame);
+            transport.sendPerformative(begin, channel);
             PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
 
             assertThat(response, is(notNullValue()));



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org