You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/02/15 17:40:41 UTC

qpid-broker-j git commit: QPID-8100: [Broker-J] [AMQP 0-10] Ensure that in error cases, session.detach is sent on the same channel as arrived the incoming frame.

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 59bd08a31 -> 1911d163b


QPID-8100: [Broker-J] [AMQP 0-10] Ensure that in error cases, session.detach is sent on the same channel as arrived the incoming frame.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/1911d163
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/1911d163
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/1911d163

Branch: refs/heads/master
Commit: 1911d163b2fe21e6630ccf16730d30917ca888c9
Parents: 59bd08a
Author: Keith Wall <kw...@apache.org>
Authored: Thu Feb 15 17:39:38 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Thu Feb 15 17:39:56 2018 +0000

----------------------------------------------------------------------
 .../server/protocol/v0_10/ServerConnection.java |  5 +-
 .../v0_10/ServerConnectionDelegate.java         | 23 +++++----
 .../qpid/tests/protocol/v0_10/SessionTest.java  | 51 ++++++++++++++++++--
 3 files changed, 65 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1911d163/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 1166058..6cc8fe8 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -136,7 +136,10 @@ public class ServerConnection extends ConnectionInvoker
     @Override
     protected void invoke(Method method)
     {
-        method.setChannel(0);
+        if (method.isConnectionControl())
+        {
+            method.setChannel(0);
+        }
         send(method);
         if (!method.isBatch())
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1911d163/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 43c3bf1..227c48d 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -401,17 +401,19 @@ public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> i
     {
         assertState(serverConnection, ConnectionState.OPEN);
 
-        ServerSessionDelegate serverSessionDelegate = new ServerSessionDelegate();
-
-        final ServerSession serverSession =
-                new ServerSession(serverConnection, serverSessionDelegate, new Binary(atc.getName()), 0);
-        final Session_0_10 session = new Session_0_10(serverConnection.getAmqpConnection(), atc.getChannel(),
-                                                      serverSession);
-        session.create();
-        serverSession.setModelObject(session);
+        // We ignore the force flag
 
         if(isSessionNameUnique(atc.getName(), serverConnection))
         {
+            ServerSessionDelegate serverSessionDelegate = new ServerSessionDelegate();
+
+            final ServerSession serverSession =
+                    new ServerSession(serverConnection, serverSessionDelegate, new Binary(atc.getName()), 0);
+            final Session_0_10 session = new Session_0_10(serverConnection.getAmqpConnection(), atc.getChannel(),
+                                                          serverSession);
+            session.create();
+            serverSession.setModelObject(session);
+
             serverConnection.map(serverSession, atc.getChannel());
             serverConnection.registerSession(serverSession);
             serverSession.sendSessionAttached(atc.getName());
@@ -419,8 +421,9 @@ public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> i
         }
         else
         {
-            serverSession.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY));
-            serverSession.closed();
+            final SessionDetached detached = new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY);
+            detached.setChannel(atc.getChannel());
+            serverConnection.invoke(detached);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1911d163/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
index abb88e9..a5cbbb1 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
@@ -31,6 +31,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v0_10.transport.SessionAttached;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionDetachCode;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionDetached;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -55,14 +56,16 @@ public class SessionTest extends BrokerAdminUsingTestBase
         {
             final Interaction interaction = transport.newInteraction();
             byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
+            final int channelId = 1;
             SessionAttached sessionAttached = interaction.openAnonymousConnection()
-                                                         .channelId(1)
+                                                         .channelId(channelId)
                                                          .session()
                                                          .attachName(sessionName)
                                                          .attach()
                                                          .consumeResponse()
                                                          .getLatestResponse(SessionAttached.class);
             assertThat(sessionAttached.getName(), IsEqual.equalTo(sessionName));
+            assertThat(sessionAttached.getChannel(), IsEqual.equalTo(channelId));
         }
     }
 
@@ -76,8 +79,9 @@ public class SessionTest extends BrokerAdminUsingTestBase
         {
             final Interaction interaction = transport.newInteraction();
             byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
+            final int channelId = 1;
             SessionDetached sessionDetached = interaction.openAnonymousConnection()
-                                                         .channelId(1)
+                                                         .channelId(channelId)
                                                          .session()
                                                          .attachName(sessionName)
                                                          .attach()
@@ -89,6 +93,7 @@ public class SessionTest extends BrokerAdminUsingTestBase
                                                          .getLatestResponse(SessionDetached.class);
 
             assertThat(sessionDetached.getName(), IsEqual.equalTo(sessionName));
+            assertThat(sessionDetached.getChannel(), IsEqual.equalTo(channelId));
         }
     }
 
@@ -104,8 +109,9 @@ public class SessionTest extends BrokerAdminUsingTestBase
         {
             final Interaction interaction = transport.newInteraction();
             byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
+            final int channelId = 1;
             SessionDetached sessionDetached = interaction.openAnonymousConnection()
-                                                         .channelId(1)
+                                                         .channelId(channelId)
                                                          .session()
                                                          .detachName(sessionName)
                                                          .detach()
@@ -113,9 +119,48 @@ public class SessionTest extends BrokerAdminUsingTestBase
                                                          .getLatestResponse(SessionDetached.class);
 
             assertThat(sessionDetached.getName(), IsEqual.equalTo(sessionName));
+            assertThat(sessionDetached.getChannel(), IsEqual.equalTo(channelId));
         }
     }
 
+    @Test
+    @SpecificationTest(section = "9.session",
+            description = "A session MUST NOT be attached to more than one transport at a time.")
+    public void attachSameSessionTwiceDisallowed() throws Exception
+    {
+        try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction1 = transport1.newInteraction();
+            byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
+            final int channelId1 = 1;
+            SessionAttached sessionAttached = interaction1.openAnonymousConnection()
+                                                          .channelId(channelId1)
+                                                          .session()
+                                                          .attachName(sessionName)
+                                                          .attach()
+                                                          .consumeResponse()
+                                                          .getLatestResponse(SessionAttached.class);
 
+            assertThat(sessionAttached.getName(), IsEqual.equalTo(sessionName));
+            assertThat(sessionAttached.getChannel(), IsEqual.equalTo(channelId1));
+
+
+            try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
+            {
+                final Interaction interaction2 = transport2.newInteraction();
+                final int channelId2 = 2;
+                SessionDetached sessionDetached = interaction2.openAnonymousConnection()
+                                                              .channelId(channelId2)
+                                                              .session()
+                                                              .attachName(sessionName)
+                                                              .attach()
+                                                              .consumeResponse()
+                                                              .getLatestResponse(SessionDetached.class);
 
+                assertThat(sessionDetached.getName(), IsEqual.equalTo(sessionName));
+                assertThat(sessionDetached.getCode(), IsEqual.equalTo(SessionDetachCode.SESSION_BUSY));
+                assertThat(sessionDetached.getChannel(), IsEqual.equalTo(channelId2));
+            }
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org