You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/12/08 15:25:34 UTC
qpid-broker-j git commit: QPID-8058: [Broker-J][AMQP 1.0] Fix
draining of temporary message sources on management node
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 377315ba5 -> 97ebcc8ef
QPID-8058: [Broker-J][AMQP 1.0] Fix draining of temporary message sources on management node
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/97ebcc8e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/97ebcc8e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/97ebcc8e
Branch: refs/heads/master
Commit: 97ebcc8ef5f0a84a42af5d2eede374d27f2e61c6
Parents: 377315b
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Dec 8 15:25:17 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Dec 8 15:25:17 2017 +0000
----------------------------------------------------------------------
.../management/amqp/ManagementNodeConsumer.java | 4 +
systests/protocol-tests-amqp-1-0/pom.xml | 5 +-
.../extensions/management/ManagementTest.java | 131 +++++++++++++++++++
.../apache/qpid/tests/utils/BrokerAdmin.java | 1 +
.../utils/EmbeddedBrokerPerClassAdminImpl.java | 6 +
.../utils/ExternalQpidBrokerAdminImpl.java | 6 +
6 files changed, 152 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 7a272fa..0662ea5 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -93,6 +93,10 @@ class ManagementNodeConsumer<T extends ConsumerTarget> implements MessageInstanc
return new MessageContainer(managementResponse, managementResponse.getMessageReference());
}
}
+ else
+ {
+ _target.noMessagesAvailable();
+ }
return null;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/systests/protocol-tests-amqp-1-0/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/pom.xml b/systests/protocol-tests-amqp-1-0/pom.xml
index aa25351..ee5bd9f 100644
--- a/systests/protocol-tests-amqp-1-0/pom.xml
+++ b/systests/protocol-tests-amqp-1-0/pom.xml
@@ -45,7 +45,10 @@
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-codegen</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-management-amqp</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-test-utils</artifactId>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
new file mode 100644
index 0000000..0025fb5
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.extensions.management;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.Session_1_0;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class ManagementTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ @SpecificationTest(section = "2.6.7",
+ description = "The drain flag indicates how the sender SHOULD behave when insufficient messages"
+ + " are available to consume the current link-credit. If set, the sender will"
+ + " (after sending all available messages) advance the delivery-count as much as possible,"
+ + " consuming all link-credit, and send the flow state to the receiver.")
+ public void drainTemporaryMessageSource() throws Exception
+ {
+ assumeThat(getBrokerAdmin().isManagementSupported(), is(equalTo(true)));
+
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ Target target = new Target();
+ target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
+ target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+ target.setDynamic(true);
+ target.setCapabilities(new Symbol[]{Symbol.valueOf("temporary-queue")});
+
+ final Interaction interaction = transport.newInteraction();
+ final Attach attachResponse = interaction.negotiateProtocol().consumeResponse()
+ .openHostname("$management")
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTarget(target)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+
+ assertThat(attachResponse.getSource(), is(notNullValue()));
+ assertThat(attachResponse.getTarget(), is(notNullValue()));
+
+ String newTemporaryNodeAddress = ((Target) attachResponse.getTarget()).getAddress();
+ assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+ interaction.consumeResponse().getLatestResponse(Flow.class);
+
+ final Attach receiverResponse = interaction.attachHandle(UnsignedInteger.ONE).attachRole(Role.RECEIVER)
+ .attachSourceAddress(newTemporaryNodeAddress)
+ .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+ .attach().consumeResponse().getLatestResponse(Attach.class);
+
+ assertThat(receiverResponse.getSource(), is(instanceOf(Source.class)));
+ assertThat(((Source)receiverResponse.getSource()).getAddress(), is(equalTo(newTemporaryNodeAddress)));
+
+ // 2.6.8 Synchronous Get
+
+ // grant credit of 1
+ interaction.flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow();
+
+ // send drain to ensure the sender promptly advances the delivery-count until link-credit is consumed
+ interaction.flowDrain(true).flow();
+
+ Flow flow = interaction.consumeResponse().getLatestResponse(Flow.class);
+ assertThat(flow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+ assertThat(flow.getHandle(), is(equalTo(receiverResponse.getHandle())));
+
+ interaction.doCloseConnection();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
index 6875460..3b57431 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
@@ -51,6 +51,7 @@ public interface BrokerAdmin extends Pluggable
boolean isSASLMechanismSupported(String mechanismName);
boolean isWebSocketSupported();
boolean isQueueDepthSupported();
+ boolean isManagementSupported();
String getValidUsername();
String getValidPassword();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
index f171f4d..0bb57ce 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
@@ -364,6 +364,12 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
}
@Override
+ public boolean isManagementSupported()
+ {
+ return true;
+ }
+
+ @Override
public String getValidUsername()
{
return "guest";
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
index e935067..f359053 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
@@ -132,6 +132,12 @@ public class ExternalQpidBrokerAdminImpl implements BrokerAdmin
}
@Override
+ public boolean isManagementSupported()
+ {
+ return false;
+ }
+
+ @Override
public boolean isSASLMechanismSupported(final String mechanismName)
{
return true;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org