You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/05/22 06:36:09 UTC
[2/2] qpid-broker-j git commit: QPID-7793: Perform contiguous frames
for the same channel under the same access controller context
QPID-7793: Perform contiguous frames for the same channel under the same access controller context
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/1b5d9ad1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/1b5d9ad1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/1b5d9ad1
Branch: refs/heads/master
Commit: 1b5d9ad134be779cd2dfa91b7585264c44a2aa32
Parents: 17925e5
Author: Keith Wall <ke...@gmail.com>
Authored: Mon May 22 07:34:09 2017 +0100
Committer: Keith Wall <ke...@gmail.com>
Committed: Mon May 22 07:35:31 2017 +0100
----------------------------------------------------------------------
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 125 ++++++++++---------
.../server/protocol/v1_0/ConnectionHandler.java | 5 +-
.../protocol/v1_0/framing/FrameHandler.java | 31 +++--
.../v1_0/type/transport/ChannelFrameBody.java | 27 ++++
.../qpid/tests/protocol/v1_0/InputHandler.java | 41 +++---
.../protocol/v1_0/messaging/TransferTest.java | 2 +-
.../v1_0/transport/session/BeginTest.java | 41 +++++-
7 files changed, 175 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1b5d9ad1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 809fb58..c5ba4a4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
@@ -43,6 +44,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,6 +90,7 @@ import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
@@ -295,15 +299,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
final Session_1_0 session = getSession(channel);
if (session != null)
{
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- session.receiveAttach(attach);
- return null;
- }
- }, session.getAccessControllerContext());
+ session.receiveAttach(attach);
}
else
{
@@ -312,16 +308,61 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
@Override
- public void receive(final short channel, final Object frame)
+ public void receive(final List<ChannelFrameBody> channelFrameBodies)
{
- FRAME_LOGGER.debug("RECV[{}|{}] : {}", _remoteAddress, channel, frame);
- if (frame instanceof FrameBody)
+ PeekingIterator<ChannelFrameBody> itr = Iterators.peekingIterator(channelFrameBodies.iterator());
+
+ while(itr.hasNext())
{
- ((FrameBody) frame).invoke(channel, this);
+ final ChannelFrameBody channelFrameBody = itr.next();
+ final int frameChannel = channelFrameBody.getChannel();
+
+ Session_1_0 session = _receivingSessions == null || frameChannel >= _receivingSessions.length ? null : _receivingSessions[frameChannel];
+ if (session != null)
+ {
+ final AccessControlContext context = session.getAccessControllerContext();
+ AccessController.doPrivileged((PrivilegedAction<Void>) () ->
+ {
+ ChannelFrameBody channelFrame = channelFrameBody;
+ boolean nextIsSameChannel;
+ do
+ {
+ received(frameChannel, channelFrame.getFrameBody());
+ nextIsSameChannel = itr.hasNext() && frameChannel == itr.peek().getChannel();
+ if (nextIsSameChannel)
+ {
+ channelFrame = itr.next();
+ }
+ }
+ while (nextIsSameChannel);
+ return null;
+ }, context);
+ }
+ else
+ {
+ received(frameChannel, channelFrameBody.getFrameBody());
+ }
+ }
+ }
+
+ private void received(int channel, Object val)
+ {
+ if (channel > getChannelMax())
+ {
+ Error error = new Error(ConnectionError.FRAMING_ERROR,
+ String.format("specified channel %d larger than maximum channel %d", channel, getChannelMax()));
+ handleError(error);
+ return;
}
- else if (frame instanceof SaslFrameBody)
+
+ FRAME_LOGGER.debug("RECV[{}|{}] : {}", _remoteAddress, channel, val);
+ if (val instanceof FrameBody)
+ {
+ ((FrameBody) val).invoke((short) channel, this);
+ }
+ else if (val instanceof SaslFrameBody)
{
- ((SaslFrameBody) frame).invoke(channel, this);
+ ((SaslFrameBody) val).invoke((short) channel, this);
}
}
@@ -541,17 +582,8 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
final Session_1_0 session = getSession(channel);
if (session != null)
{
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- _receivingSessions[channel] = null;
-
- session.receiveEnd(end);
- return null;
- }
- }, session.getAccessControllerContext());
+ _receivingSessions[channel] = null;
+ session.receiveEnd(end);
}
else
{
@@ -571,15 +603,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
final Session_1_0 session = getSession(channel);
if (session != null)
{
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- session.receiveDisposition(disposition);
- return null;
- }
- }, session.getAccessControllerContext());
+ session.receiveDisposition(disposition);
}
else
{
@@ -637,6 +661,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
synchronized (_blockingLock)
{
+
_sessions.add(session);
if (_blocking)
{
@@ -686,15 +711,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
final Session_1_0 session = getSession(channel);
if (session != null)
{
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- session.receiveTransfer(transfer);
- return null;
- }
- }, session.getAccessControllerContext());
+ session.receiveTransfer(transfer);
}
else
{
@@ -708,15 +725,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
final Session_1_0 session = getSession(channel);
if (session != null)
{
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- session.receiveFlow(flow);
- return null;
- }
- }, session.getAccessControllerContext());
+ session.receiveFlow(flow);
}
else
{
@@ -897,15 +906,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
final Session_1_0 session = getSession(channel);
if (session != null)
{
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- session.receiveDetach(detach);
- return null;
- }
- }, session.getAccessControllerContext());
+ session.receiveDetach(detach);
}
else
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1b5d9ad1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java
index 549fe4b..03a3794 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java
@@ -20,8 +20,11 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
@@ -59,5 +62,5 @@ public interface ConnectionHandler extends SASLEndpoint
boolean closedForInput();
- void receive(short channel, Object val);
+ void receive(List<ChannelFrameBody> channelFrameBodies);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1b5d9ad1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
index 200979a..f8e84f3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
@@ -20,8 +20,10 @@
*/
package org.apache.qpid.server.protocol.v1_0.framing;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Formatter;
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +34,7 @@ import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
@@ -60,6 +63,7 @@ public class FrameHandler implements ProtocolHandler
int size;
int remaining;
+ List<ChannelFrameBody> channelFrameBodies = new ArrayList<>();
while ((remaining = in.remaining()) >=8 && frameParsingError == null)
{
@@ -133,16 +137,7 @@ public class FrameHandler implements ProtocolHandler
break;
}
- int channel = ((int)in.getShort()) & 0xffff;
-
- if (channel > _connectionHandler.getChannelMax())
- {
- frameParsingError = createFramingError(
- "specified channel %d larger than maximum channel %d",
- channel,
- _connectionHandler.getChannelMax());
- break;
- }
+ int channel = in.getUnsignedShort();
if (dataOffset != 8)
{
@@ -173,7 +168,20 @@ public class FrameHandler implements ProtocolHandler
val);
}
}
- _connectionHandler.receive((short) channel, val);
+ channelFrameBodies.add(new ChannelFrameBody()
+ {
+ @Override
+ public int getChannel()
+ {
+ return channel;
+ }
+
+ @Override
+ public Object getFrameBody()
+ {
+ return val;
+ }
+ });
}
catch (AmqpErrorException ex)
{
@@ -184,6 +192,7 @@ public class FrameHandler implements ProtocolHandler
dup.dispose();
}
}
+ _connectionHandler.receive(channelFrameBodies);
if (frameParsingError != null)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1b5d9ad1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/ChannelFrameBody.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/ChannelFrameBody.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/ChannelFrameBody.java
new file mode 100644
index 0000000..b6ce019
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/ChannelFrameBody.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.type.transport;
+
+public interface ChannelFrameBody
+{
+ int getChannel();
+ Object getFrameBody();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1b5d9ad1/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
index 770779e..cb5a656 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
@@ -19,6 +19,7 @@
package org.apache.qpid.tests.protocol.v1_0;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import io.netty.buffer.ByteBuf;
@@ -43,6 +44,7 @@ import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
@@ -137,13 +139,6 @@ public class InputHandler extends ChannelInboundHandlerAdapter
}
}
- @Override
- public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception
- {
- LOGGER.debug("KWDEBUG channelReadComplete");
- super.channelReadComplete(ctx);
- }
-
private class MyConnectionHandler implements ConnectionHandler
{
@Override
@@ -225,23 +220,27 @@ public class InputHandler extends ChannelInboundHandlerAdapter
}
@Override
- public void receive(final short channel, final Object val)
+ public void receive(final List<ChannelFrameBody> channelFrameBodies)
{
- Response response;
- if (val instanceof FrameBody)
- {
- response = new PerformativeResponse(channel, (FrameBody) val);
- }
- else if (val instanceof SaslFrameBody)
- {
- throw new UnsupportedOperationException("TODO: ");
- }
- else
+ for (final ChannelFrameBody channelFrameBody : channelFrameBodies)
{
- throw new UnsupportedOperationException("Unexoected frame type : " + val.getClass());
+ Response response;
+ Object val = channelFrameBody.getFrameBody();
+ int channel = channelFrameBody.getChannel();
+ if (val instanceof FrameBody)
+ {
+ response = new PerformativeResponse((short) channel, (FrameBody) val);
+ }
+ else if (val instanceof SaslFrameBody)
+ {
+ throw new UnsupportedOperationException("TODO: ");
+ }
+ else
+ {
+ throw new UnsupportedOperationException("Unexoected frame type : " + val.getClass());
+ }
+ _responseQueue.add(response);
}
- _responseQueue.add(response);
-
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1b5d9ad1/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 11e15eb..f3d939f 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -95,7 +95,7 @@ public class TransferTest extends ProtocolTestBase
@Test
@Ignore("QPID-7749")
@SpecificationTest(section = "2.6.12",
- description = "Transfering A Message.")
+ description = "Transferring A Message.")
public void transfer() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress))
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1b5d9ad1/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
index 02312c4..d12bb0b 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
@@ -29,12 +29,13 @@ import java.net.InetSocketAddress;
import org.junit.Test;
-import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
@@ -94,4 +95,42 @@ public class BeginTest extends ProtocolTestBase
transport.doCloseConnection();
}
}
+
+ @Test
+ @SpecificationTest(section = "2.7.1",
+ description = "A peer that receives a channel number outside the supported range MUST close "
+ + "the connection with the framing-error error-code..")
+ public void channelMax() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr))
+ {
+ UnsignedShort channelMax = UnsignedShort.valueOf((short) 5);
+ transport.doProtocolNegotiation();
+ Open open = new Open();
+ open.setChannelMax(channelMax);
+ open.setContainerId("testContainer");
+
+
+ transport.sendPerformative(open, UnsignedShort.valueOf((short) 0));
+ PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Open.class)));
+
+ Begin begin = new Begin();
+ begin.setNextOutgoingId(UnsignedInteger.ZERO);
+ begin.setIncomingWindow(UnsignedInteger.ZERO);
+ begin.setOutgoingWindow(UnsignedInteger.ZERO);
+
+ UnsignedShort invalidChannel = UnsignedShort.valueOf((short) (channelMax.intValue() + 1));
+ transport.sendPerformative(begin, invalidChannel);
+ response = (PerformativeResponse) transport.getNextResponse();
+
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Close.class)));
+ Close responseClose = (Close) response.getFrameBody();
+ assertThat(responseClose.getError(), is(notNullValue()));
+ assertThat(responseClose.getError().getCondition(), equalTo(ConnectionError.FRAMING_ERROR));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org