You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/05/22 06:36:09 UTC

[2/2] qpid-broker-j git commit: QPID-7793: Perform contiguous frames for the same channel under the same access controller context

QPID-7793: Perform contiguous frames for the same channel under the same access controller context


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/1b5d9ad1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/1b5d9ad1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/1b5d9ad1

Branch: refs/heads/master
Commit: 1b5d9ad134be779cd2dfa91b7585264c44a2aa32
Parents: 17925e5
Author: Keith Wall <ke...@gmail.com>
Authored: Mon May 22 07:34:09 2017 +0100
Committer: Keith Wall <ke...@gmail.com>
Committed: Mon May 22 07:35:31 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/AMQPConnection_1_0Impl.java   | 125 ++++++++++---------
 .../server/protocol/v1_0/ConnectionHandler.java |   5 +-
 .../protocol/v1_0/framing/FrameHandler.java     |  31 +++--
 .../v1_0/type/transport/ChannelFrameBody.java   |  27 ++++
 .../qpid/tests/protocol/v1_0/InputHandler.java  |  41 +++---
 .../protocol/v1_0/messaging/TransferTest.java   |   2 +-
 .../v1_0/transport/session/BeginTest.java       |  41 +++++-
 7 files changed, 175 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1b5d9ad1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 809fb58..c5ba4a4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;
 
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
+import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
@@ -43,6 +44,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,6 +90,7 @@ import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
@@ -295,15 +299,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         final Session_1_0 session = getSession(channel);
         if (session != null)
         {
-            AccessController.doPrivileged(new PrivilegedAction<Object>()
-            {
-                @Override
-                public Object run()
-                {
-                    session.receiveAttach(attach);
-                    return null;
-                }
-            }, session.getAccessControllerContext());
+            session.receiveAttach(attach);
         }
         else
         {
@@ -312,16 +308,61 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     }
 
     @Override
-    public void receive(final short channel, final Object frame)
+    public void receive(final List<ChannelFrameBody> channelFrameBodies)
     {
-        FRAME_LOGGER.debug("RECV[{}|{}] : {}", _remoteAddress, channel, frame);
-        if (frame instanceof FrameBody)
+        PeekingIterator<ChannelFrameBody> itr = Iterators.peekingIterator(channelFrameBodies.iterator());
+
+        while(itr.hasNext())
         {
-            ((FrameBody) frame).invoke(channel, this);
+            final ChannelFrameBody channelFrameBody = itr.next();
+            final int frameChannel = channelFrameBody.getChannel();
+
+            Session_1_0 session = _receivingSessions == null || frameChannel >= _receivingSessions.length ? null : _receivingSessions[frameChannel];
+            if (session != null)
+            {
+                final AccessControlContext context = session.getAccessControllerContext();
+                AccessController.doPrivileged((PrivilegedAction<Void>) () ->
+                {
+                    ChannelFrameBody channelFrame = channelFrameBody;
+                    boolean nextIsSameChannel;
+                    do
+                    {
+                        received(frameChannel, channelFrame.getFrameBody());
+                        nextIsSameChannel = itr.hasNext() && frameChannel == itr.peek().getChannel();
+                        if (nextIsSameChannel)
+                        {
+                            channelFrame = itr.next();
+                        }
+                    }
+                    while (nextIsSameChannel);
+                    return null;
+                }, context);
+            }
+            else
+            {
+                received(frameChannel, channelFrameBody.getFrameBody());
+            }
+        }
+    }
+
+    private void received(int channel, Object val)
+    {
+        if (channel > getChannelMax())
+        {
+            Error error = new Error(ConnectionError.FRAMING_ERROR,
+                                    String.format("specified channel %d larger than maximum channel %d", channel, getChannelMax()));
+            handleError(error);
+            return;
         }
-        else if (frame instanceof SaslFrameBody)
+
+        FRAME_LOGGER.debug("RECV[{}|{}] : {}", _remoteAddress, channel, val);
+        if (val instanceof FrameBody)
+        {
+            ((FrameBody) val).invoke((short) channel, this);
+        }
+        else if (val instanceof SaslFrameBody)
         {
-            ((SaslFrameBody) frame).invoke(channel, this);
+            ((SaslFrameBody) val).invoke((short) channel, this);
         }
     }
 
@@ -541,17 +582,8 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         final Session_1_0 session = getSession(channel);
         if (session != null)
         {
-            AccessController.doPrivileged(new PrivilegedAction<Object>()
-            {
-                @Override
-                public Object run()
-                {
-                    _receivingSessions[channel] = null;
-
-                    session.receiveEnd(end);
-                    return null;
-                }
-            }, session.getAccessControllerContext());
+            _receivingSessions[channel] = null;
+            session.receiveEnd(end);
         }
         else
         {
@@ -571,15 +603,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         final Session_1_0 session = getSession(channel);
         if (session != null)
         {
-            AccessController.doPrivileged(new PrivilegedAction<Object>()
-            {
-                @Override
-                public Object run()
-                {
-                    session.receiveDisposition(disposition);
-                    return null;
-                }
-            }, session.getAccessControllerContext());
+            session.receiveDisposition(disposition);
         }
         else
         {
@@ -637,6 +661,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
 
                     synchronized (_blockingLock)
                     {
+
                         _sessions.add(session);
                         if (_blocking)
                         {
@@ -686,15 +711,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         final Session_1_0 session = getSession(channel);
         if (session != null)
         {
-            AccessController.doPrivileged(new PrivilegedAction<Object>()
-            {
-                @Override
-                public Object run()
-                {
-                    session.receiveTransfer(transfer);
-                    return null;
-                }
-            }, session.getAccessControllerContext());
+            session.receiveTransfer(transfer);
         }
         else
         {
@@ -708,15 +725,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         final Session_1_0 session = getSession(channel);
         if (session != null)
         {
-            AccessController.doPrivileged(new PrivilegedAction<Object>()
-            {
-                @Override
-                public Object run()
-                {
-                    session.receiveFlow(flow);
-                    return null;
-                }
-            }, session.getAccessControllerContext());
+            session.receiveFlow(flow);
         }
         else
         {
@@ -897,15 +906,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         final Session_1_0 session = getSession(channel);
         if (session != null)
         {
-            AccessController.doPrivileged(new PrivilegedAction<Object>()
-            {
-                @Override
-                public Object run()
-                {
-                    session.receiveDetach(detach);
-                    return null;
-                }
-            }, session.getAccessControllerContext());
+            session.receiveDetach(detach);
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1b5d9ad1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java
index 549fe4b..03a3794 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java
@@ -20,8 +20,11 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
@@ -59,5 +62,5 @@ public interface ConnectionHandler extends SASLEndpoint
 
     boolean closedForInput();
 
-    void receive(short channel, Object val);
+    void receive(List<ChannelFrameBody> channelFrameBodies);
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1b5d9ad1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
index 200979a..f8e84f3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
@@ -20,8 +20,10 @@
  */
 package org.apache.qpid.server.protocol.v1_0.framing;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Formatter;
+import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,6 +34,7 @@ import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
 import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
@@ -60,6 +63,7 @@ public class FrameHandler implements ProtocolHandler
             int size;
 
             int remaining;
+            List<ChannelFrameBody> channelFrameBodies = new ArrayList<>();
             while ((remaining = in.remaining()) >=8 && frameParsingError == null)
             {
 
@@ -133,16 +137,7 @@ public class FrameHandler implements ProtocolHandler
                     break;
                 }
 
-                int channel = ((int)in.getShort()) & 0xffff;
-
-                if (channel > _connectionHandler.getChannelMax())
-                {
-                    frameParsingError = createFramingError(
-                            "specified channel %d larger than maximum channel %d",
-                            channel,
-                            _connectionHandler.getChannelMax());
-                    break;
-                }
+                int channel = in.getUnsignedShort();
 
                 if (dataOffset != 8)
                 {
@@ -173,7 +168,20 @@ public class FrameHandler implements ProtocolHandler
                                     val);
                         }
                     }
-                    _connectionHandler.receive((short) channel, val);
+                    channelFrameBodies.add(new ChannelFrameBody()
+                    {
+                        @Override
+                        public int getChannel()
+                        {
+                            return channel;
+                        }
+
+                        @Override
+                        public Object getFrameBody()
+                        {
+                            return val;
+                        }
+                    });
                 }
                 catch (AmqpErrorException ex)
                 {
@@ -184,6 +192,7 @@ public class FrameHandler implements ProtocolHandler
                     dup.dispose();
                 }
             }
+            _connectionHandler.receive(channelFrameBodies);
 
             if (frameParsingError != null)
             {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1b5d9ad1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/ChannelFrameBody.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/ChannelFrameBody.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/ChannelFrameBody.java
new file mode 100644
index 0000000..b6ce019
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/ChannelFrameBody.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.type.transport;
+
+public interface ChannelFrameBody
+{
+    int getChannel();
+    Object getFrameBody();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1b5d9ad1/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
index 770779e..cb5a656 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
@@ -19,6 +19,7 @@
 
 package org.apache.qpid.tests.protocol.v1_0;
 
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 
 import io.netty.buffer.ByteBuf;
@@ -43,6 +44,7 @@ import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
@@ -137,13 +139,6 @@ public class InputHandler extends ChannelInboundHandlerAdapter
         }
     }
 
-    @Override
-    public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception
-    {
-        LOGGER.debug("KWDEBUG channelReadComplete");
-        super.channelReadComplete(ctx);
-    }
-
     private class MyConnectionHandler implements ConnectionHandler
     {
         @Override
@@ -225,23 +220,27 @@ public class InputHandler extends ChannelInboundHandlerAdapter
         }
 
         @Override
-        public void receive(final short channel, final Object val)
+        public void receive(final List<ChannelFrameBody> channelFrameBodies)
         {
-            Response response;
-            if (val instanceof FrameBody)
-            {
-                response = new PerformativeResponse(channel, (FrameBody) val);
-            }
-            else if (val instanceof SaslFrameBody)
-            {
-                throw new UnsupportedOperationException("TODO: ");
-            }
-            else
+            for (final ChannelFrameBody channelFrameBody : channelFrameBodies)
             {
-                throw new UnsupportedOperationException("Unexoected frame type : " + val.getClass());
+                Response response;
+                Object val = channelFrameBody.getFrameBody();
+                int channel = channelFrameBody.getChannel();
+                if (val instanceof FrameBody)
+                {
+                    response = new PerformativeResponse((short) channel, (FrameBody) val);
+                }
+                else if (val instanceof SaslFrameBody)
+                {
+                    throw new UnsupportedOperationException("TODO: ");
+                }
+                else
+                {
+                    throw new UnsupportedOperationException("Unexoected frame type : " + val.getClass());
+                }
+                _responseQueue.add(response);
             }
-            _responseQueue.add(response);
-
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1b5d9ad1/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 11e15eb..f3d939f 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -95,7 +95,7 @@ public class TransferTest extends ProtocolTestBase
     @Test
     @Ignore("QPID-7749")
     @SpecificationTest(section = "2.6.12",
-            description = "Transfering A Message.")
+            description = "Transferring A Message.")
     public void transfer() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress))

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1b5d9ad1/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
index 02312c4..d12bb0b 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
@@ -29,12 +29,13 @@ import java.net.InetSocketAddress;
 
 import org.junit.Test;
 
-import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
@@ -94,4 +95,42 @@ public class BeginTest extends ProtocolTestBase
             transport.doCloseConnection();
         }
     }
+
+    @Test
+    @SpecificationTest(section = "2.7.1",
+                       description = "A peer that receives a channel number outside the supported range MUST close "
+                                     + "the connection with the framing-error error-code..")
+    public void channelMax() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr))
+        {
+            UnsignedShort channelMax = UnsignedShort.valueOf((short) 5);
+            transport.doProtocolNegotiation();
+            Open open = new Open();
+            open.setChannelMax(channelMax);
+            open.setContainerId("testContainer");
+
+
+            transport.sendPerformative(open, UnsignedShort.valueOf((short) 0));
+            PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getFrameBody(), is(instanceOf(Open.class)));
+
+            Begin begin = new Begin();
+            begin.setNextOutgoingId(UnsignedInteger.ZERO);
+            begin.setIncomingWindow(UnsignedInteger.ZERO);
+            begin.setOutgoingWindow(UnsignedInteger.ZERO);
+
+            UnsignedShort invalidChannel = UnsignedShort.valueOf((short) (channelMax.intValue() + 1));
+            transport.sendPerformative(begin, invalidChannel);
+            response = (PerformativeResponse) transport.getNextResponse();
+
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getFrameBody(), is(instanceOf(Close.class)));
+            Close responseClose = (Close) response.getFrameBody();
+            assertThat(responseClose.getError(), is(notNullValue()));
+            assertThat(responseClose.getError().getCondition(), equalTo(ConnectionError.FRAMING_ERROR));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org