You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/12/08 15:25:34 UTC

qpid-broker-j git commit: QPID-8058: [Broker-J][AMQP 1.0] Fix draining of temporary message sources on management node

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 377315ba5 -> 97ebcc8ef


QPID-8058: [Broker-J][AMQP 1.0] Fix draining of temporary message sources on management node


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/97ebcc8e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/97ebcc8e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/97ebcc8e

Branch: refs/heads/master
Commit: 97ebcc8ef5f0a84a42af5d2eede374d27f2e61c6
Parents: 377315b
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Dec 8 15:25:17 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Dec 8 15:25:17 2017 +0000

----------------------------------------------------------------------
 .../management/amqp/ManagementNodeConsumer.java |   4 +
 systests/protocol-tests-amqp-1-0/pom.xml        |   5 +-
 .../extensions/management/ManagementTest.java   | 131 +++++++++++++++++++
 .../apache/qpid/tests/utils/BrokerAdmin.java    |   1 +
 .../utils/EmbeddedBrokerPerClassAdminImpl.java  |   6 +
 .../utils/ExternalQpidBrokerAdminImpl.java      |   6 +
 6 files changed, 152 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 7a272fa..0662ea5 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -93,6 +93,10 @@ class ManagementNodeConsumer<T extends ConsumerTarget> implements MessageInstanc
                 return new MessageContainer(managementResponse, managementResponse.getMessageReference());
             }
         }
+        else
+        {
+            _target.noMessagesAvailable();
+        }
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/systests/protocol-tests-amqp-1-0/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/pom.xml b/systests/protocol-tests-amqp-1-0/pom.xml
index aa25351..ee5bd9f 100644
--- a/systests/protocol-tests-amqp-1-0/pom.xml
+++ b/systests/protocol-tests-amqp-1-0/pom.xml
@@ -45,7 +45,10 @@
             <groupId>org.apache.qpid</groupId>
             <artifactId>qpid-broker-codegen</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-plugins-management-amqp</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.qpid</groupId>
             <artifactId>qpid-test-utils</artifactId>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
new file mode 100644
index 0000000..0025fb5
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.extensions.management;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.Session_1_0;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class ManagementTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @SpecificationTest(section = "2.6.7",
+            description = "The drain flag indicates how the sender SHOULD behave when insufficient messages"
+                          + " are available to consume the current link-credit. If set, the sender will"
+                          + " (after sending all available messages) advance the delivery-count as much as possible,"
+                          + " consuming all link-credit, and send the flow state to the receiver.")
+    public void drainTemporaryMessageSource() throws Exception
+    {
+        assumeThat(getBrokerAdmin().isManagementSupported(), is(equalTo(true)));
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            Target target = new Target();
+            target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
+            target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+            target.setDynamic(true);
+            target.setCapabilities(new Symbol[]{Symbol.valueOf("temporary-queue")});
+
+            final Interaction interaction = transport.newInteraction();
+            final Attach attachResponse = interaction.negotiateProtocol().consumeResponse()
+                                                     .openHostname("$management")
+                                                     .open().consumeResponse(Open.class)
+                                                     .begin().consumeResponse(Begin.class)
+                                                     .attachRole(Role.SENDER)
+                                                     .attachTarget(target)
+                                                     .attach().consumeResponse()
+                                                     .getLatestResponse(Attach.class);
+
+            assertThat(attachResponse.getSource(), is(notNullValue()));
+            assertThat(attachResponse.getTarget(), is(notNullValue()));
+
+            String newTemporaryNodeAddress = ((Target) attachResponse.getTarget()).getAddress();
+            assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+            interaction.consumeResponse().getLatestResponse(Flow.class);
+
+            final Attach receiverResponse = interaction.attachHandle(UnsignedInteger.ONE).attachRole(Role.RECEIVER)
+                                                       .attachSourceAddress(newTemporaryNodeAddress)
+                                                       .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                                       .attach().consumeResponse().getLatestResponse(Attach.class);
+
+            assertThat(receiverResponse.getSource(), is(instanceOf(Source.class)));
+            assertThat(((Source)receiverResponse.getSource()).getAddress(), is(equalTo(newTemporaryNodeAddress)));
+
+            // 2.6.8  Synchronous Get
+
+            // grant credit of 1
+            interaction.flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow();
+
+            // send drain to ensure the sender promptly advances the delivery-count until link-credit is consumed
+            interaction.flowDrain(true).flow();
+
+            Flow flow = interaction.consumeResponse().getLatestResponse(Flow.class);
+            assertThat(flow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+            assertThat(flow.getHandle(), is(equalTo(receiverResponse.getHandle())));
+
+            interaction.doCloseConnection();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
index 6875460..3b57431 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
@@ -51,6 +51,7 @@ public interface BrokerAdmin extends Pluggable
     boolean isSASLMechanismSupported(String mechanismName);
     boolean isWebSocketSupported();
     boolean isQueueDepthSupported();
+    boolean isManagementSupported();
 
     String getValidUsername();
     String getValidPassword();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
index f171f4d..0bb57ce 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
@@ -364,6 +364,12 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
     }
 
     @Override
+    public boolean isManagementSupported()
+    {
+        return true;
+    }
+
+    @Override
     public String getValidUsername()
     {
         return "guest";

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
index e935067..f359053 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
@@ -132,6 +132,12 @@ public class ExternalQpidBrokerAdminImpl implements BrokerAdmin
     }
 
     @Override
+    public boolean isManagementSupported()
+    {
+        return false;
+    }
+
+    @Override
     public boolean isSASLMechanismSupported(final String mechanismName)
     {
         return true;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org