You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/02/15 17:40:41 UTC
qpid-broker-j git commit: QPID-8100: [Broker-J] [AMQP 0-10] Ensure
that in error cases,
session.detach is sent on the same channel as arrived the incoming frame.
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 59bd08a31 -> 1911d163b
QPID-8100: [Broker-J] [AMQP 0-10] Ensure that in error cases, session.detach is sent on the same channel as arrived the incoming frame.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/1911d163
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/1911d163
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/1911d163
Branch: refs/heads/master
Commit: 1911d163b2fe21e6630ccf16730d30917ca888c9
Parents: 59bd08a
Author: Keith Wall <kw...@apache.org>
Authored: Thu Feb 15 17:39:38 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Thu Feb 15 17:39:56 2018 +0000
----------------------------------------------------------------------
.../server/protocol/v0_10/ServerConnection.java | 5 +-
.../v0_10/ServerConnectionDelegate.java | 23 +++++----
.../qpid/tests/protocol/v0_10/SessionTest.java | 51 ++++++++++++++++++--
3 files changed, 65 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1911d163/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 1166058..6cc8fe8 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -136,7 +136,10 @@ public class ServerConnection extends ConnectionInvoker
@Override
protected void invoke(Method method)
{
- method.setChannel(0);
+ if (method.isConnectionControl())
+ {
+ method.setChannel(0);
+ }
send(method);
if (!method.isBatch())
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1911d163/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 43c3bf1..227c48d 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -401,17 +401,19 @@ public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> i
{
assertState(serverConnection, ConnectionState.OPEN);
- ServerSessionDelegate serverSessionDelegate = new ServerSessionDelegate();
-
- final ServerSession serverSession =
- new ServerSession(serverConnection, serverSessionDelegate, new Binary(atc.getName()), 0);
- final Session_0_10 session = new Session_0_10(serverConnection.getAmqpConnection(), atc.getChannel(),
- serverSession);
- session.create();
- serverSession.setModelObject(session);
+ // We ignore the force flag
if(isSessionNameUnique(atc.getName(), serverConnection))
{
+ ServerSessionDelegate serverSessionDelegate = new ServerSessionDelegate();
+
+ final ServerSession serverSession =
+ new ServerSession(serverConnection, serverSessionDelegate, new Binary(atc.getName()), 0);
+ final Session_0_10 session = new Session_0_10(serverConnection.getAmqpConnection(), atc.getChannel(),
+ serverSession);
+ session.create();
+ serverSession.setModelObject(session);
+
serverConnection.map(serverSession, atc.getChannel());
serverConnection.registerSession(serverSession);
serverSession.sendSessionAttached(atc.getName());
@@ -419,8 +421,9 @@ public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> i
}
else
{
- serverSession.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY));
- serverSession.closed();
+ final SessionDetached detached = new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY);
+ detached.setChannel(atc.getChannel());
+ serverConnection.invoke(detached);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1911d163/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
index abb88e9..a5cbbb1 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
@@ -31,6 +31,7 @@ import org.junit.Ignore;
import org.junit.Test;
import org.apache.qpid.server.protocol.v0_10.transport.SessionAttached;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionDetachCode;
import org.apache.qpid.server.protocol.v0_10.transport.SessionDetached;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -55,14 +56,16 @@ public class SessionTest extends BrokerAdminUsingTestBase
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
+ final int channelId = 1;
SessionAttached sessionAttached = interaction.openAnonymousConnection()
- .channelId(1)
+ .channelId(channelId)
.session()
.attachName(sessionName)
.attach()
.consumeResponse()
.getLatestResponse(SessionAttached.class);
assertThat(sessionAttached.getName(), IsEqual.equalTo(sessionName));
+ assertThat(sessionAttached.getChannel(), IsEqual.equalTo(channelId));
}
}
@@ -76,8 +79,9 @@ public class SessionTest extends BrokerAdminUsingTestBase
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
+ final int channelId = 1;
SessionDetached sessionDetached = interaction.openAnonymousConnection()
- .channelId(1)
+ .channelId(channelId)
.session()
.attachName(sessionName)
.attach()
@@ -89,6 +93,7 @@ public class SessionTest extends BrokerAdminUsingTestBase
.getLatestResponse(SessionDetached.class);
assertThat(sessionDetached.getName(), IsEqual.equalTo(sessionName));
+ assertThat(sessionDetached.getChannel(), IsEqual.equalTo(channelId));
}
}
@@ -104,8 +109,9 @@ public class SessionTest extends BrokerAdminUsingTestBase
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
+ final int channelId = 1;
SessionDetached sessionDetached = interaction.openAnonymousConnection()
- .channelId(1)
+ .channelId(channelId)
.session()
.detachName(sessionName)
.detach()
@@ -113,9 +119,48 @@ public class SessionTest extends BrokerAdminUsingTestBase
.getLatestResponse(SessionDetached.class);
assertThat(sessionDetached.getName(), IsEqual.equalTo(sessionName));
+ assertThat(sessionDetached.getChannel(), IsEqual.equalTo(channelId));
}
}
+ @Test
+ @SpecificationTest(section = "9.session",
+ description = "A session MUST NOT be attached to more than one transport at a time.")
+ public void attachSameSessionTwiceDisallowed() throws Exception
+ {
+ try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction1 = transport1.newInteraction();
+ byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
+ final int channelId1 = 1;
+ SessionAttached sessionAttached = interaction1.openAnonymousConnection()
+ .channelId(channelId1)
+ .session()
+ .attachName(sessionName)
+ .attach()
+ .consumeResponse()
+ .getLatestResponse(SessionAttached.class);
+ assertThat(sessionAttached.getName(), IsEqual.equalTo(sessionName));
+ assertThat(sessionAttached.getChannel(), IsEqual.equalTo(channelId1));
+
+
+ try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction2 = transport2.newInteraction();
+ final int channelId2 = 2;
+ SessionDetached sessionDetached = interaction2.openAnonymousConnection()
+ .channelId(channelId2)
+ .session()
+ .attachName(sessionName)
+ .attach()
+ .consumeResponse()
+ .getLatestResponse(SessionDetached.class);
+ assertThat(sessionDetached.getName(), IsEqual.equalTo(sessionName));
+ assertThat(sessionDetached.getCode(), IsEqual.equalTo(SessionDetachCode.SESSION_BUSY));
+ assertThat(sessionDetached.getChannel(), IsEqual.equalTo(channelId2));
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org