You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/07/24 17:05:45 UTC
[qpid-broker-j] branch master updated: QPID-8343: [Broker-J][AMQP
1.0] Derive dynamic node durability from terminus expiry policy
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/master by this push:
new bf75f49 QPID-8343: [Broker-J][AMQP 1.0] Derive dynamic node durability from terminus expiry policy
bf75f49 is described below
commit bf75f490170beb91f72547c00c365bf024cbaa1f
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed Jul 24 18:03:05 2019 +0100
QPID-8343: [Broker-J][AMQP 1.0] Derive dynamic node durability from terminus expiry policy
---
.../apache/qpid/server/model/BrokerTestHelper.java | 4 ++
.../qpid/server/protocol/v1_0/Session_1_0.java | 43 ++++++++--------
.../protocol/v1_0/type/messaging/Source.java | 2 +-
.../protocol/v1_0/type/messaging/Target.java | 2 +-
.../protocol/v1_0/type/messaging/Terminus.java | 43 ++++++++++++++++
.../qpid/server/protocol/v1_0/Session_1_0Test.java | 60 ++++++++++++++++++++--
6 files changed, 129 insertions(+), 25 deletions(-)
diff --git a/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java b/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java
index a8d594c..fb36677 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java
@@ -338,4 +338,8 @@ public class BrokerTestHelper
return mock;
}
+ public static <X extends ConfiguredObject> X mockAsSystemPrincipalSource(Class<X> clazz)
+ {
+ return mockWithSystemPrincipal(clazz, SYSTEM_PRINCIPAL);
+ }
}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 931cf95..1300b4d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -28,7 +28,6 @@ import java.security.AccessController;
import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -89,6 +88,8 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoMessages;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Terminus;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
@@ -660,7 +661,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
{
if (Boolean.TRUE.equals(target.getDynamic()))
{
- MessageDestination tempDestination = createDynamicDestination(link, target.getDynamicNodeProperties(), target.getCapabilities());
+ MessageDestination tempDestination = createDynamicDestination(link, target);
if(tempDestination != null)
{
target.setAddress(_primaryDomain + tempDestination.getName());
@@ -739,10 +740,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
if (Boolean.TRUE.equals(source.getDynamic()))
{
- final Set<Symbol> sourceCapabilities = source.getCapabilities() == null
- ? Collections.emptySet()
- : new HashSet<>(Arrays.asList(source.getCapabilities()));
- MessageSource tempSource = createDynamicSource(link, source.getDynamicNodeProperties(), sourceCapabilities);
+ MessageSource tempSource = createDynamicSource(link, source);
if(tempSource != null)
{
source.setAddress(_primaryDomain + tempSource.getName());
@@ -817,21 +815,23 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
}
private MessageSource createDynamicSource(final Link_1_0<?, ?> link,
- Map properties,
- final Set<Symbol> capabilities) throws AmqpErrorException
+ final Terminus terminus) throws AmqpErrorException
{
// TODO temporary topics?
final String queueName = "TempQueue" + UUID.randomUUID().toString();
try
{
- Map<String, Object> attributes = convertDynamicNodePropertiesToAttributes(link, properties, queueName);
+ final Map<String, Object> attributes = createDynamicNodeAttributes(link, terminus, queueName);
- if (capabilities.contains(Symbol.valueOf("temporary-queue"))
- || capabilities.contains(Symbol.valueOf("temporary-topic")))
+ if (terminus.getCapabilities() != null)
{
- attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION);
+ final Set<Symbol> capabilities = Sets.newHashSet(terminus.getCapabilities());
+ if (capabilities.contains(Symbol.valueOf("temporary-queue"))
+ || capabilities.contains(Symbol.valueOf("temporary-topic")))
+ {
+ attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION);
+ }
}
-
return Subject.doAs(getSubjectWithAddedSystemRights(),
(PrivilegedAction<MessageSource>) () -> getAddressSpace().createMessageSource(MessageSource.class, attributes));
}
@@ -848,15 +848,15 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
private MessageDestination createDynamicDestination(final Link_1_0<?, ?> link,
- Map properties,
- final Symbol[] capabilities) throws AmqpErrorException
+ final Terminus terminus) throws AmqpErrorException
{
+ final Symbol[] capabilities = terminus.getCapabilities();
final Set<Symbol> capabilitySet = capabilities == null ? Collections.emptySet() : Sets.newHashSet(capabilities);
boolean isTopic = capabilitySet.contains(Symbol.valueOf("temporary-topic")) || capabilitySet.contains(Symbol.valueOf("topic"));
final String destName = (isTopic ? "TempTopic" : "TempQueue") + UUID.randomUUID().toString();
try
{
- Map<String, Object> attributes = convertDynamicNodePropertiesToAttributes(link, properties, destName);
+ final Map<String, Object> attributes = createDynamicNodeAttributes(link, terminus, destName);
Class<? extends MessageDestination> clazz = isTopic ? Exchange.class : MessageDestination.class;
@@ -883,11 +883,14 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
}
}
- private Map<String, Object> convertDynamicNodePropertiesToAttributes(final Link_1_0<?, ?> link,
- final Map properties,
- final String nodeName)
+ private Map<String, Object> createDynamicNodeAttributes(final Link_1_0<?, ?> link,
+ final Terminus terminus,
+ final String nodeName)
{
// TODO convert AMQP 1-0 node properties to queue attributes
+
+ final Map<Symbol, Object> properties = terminus.getDynamicNodeProperties();
+ final TerminusExpiryPolicy expiryPolicy = terminus.getExpiryPolicy();
LifetimePolicy lifetimePolicy = properties == null
? null
: (LifetimePolicy) properties.get(LIFETIME_POLICY);
@@ -895,7 +898,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
Map<String,Object> attributes = new HashMap<>();
attributes.put(ConfiguredObject.ID, UUID.randomUUID());
attributes.put(ConfiguredObject.NAME, nodeName);
- attributes.put(ConfiguredObject.DURABLE, true);
+ attributes.put(ConfiguredObject.DURABLE, TerminusExpiryPolicy.NEVER.equals(expiryPolicy));
if(lifetimePolicy instanceof DeleteOnNoLinks)
{
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java
index 47e67d8..547958c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java
@@ -37,7 +37,7 @@ import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
@CompositeType( symbolicDescriptor = "amqp:source:list", numericDescriptor = 0x0000000000000028L)
-public class Source implements BaseSource
+public class Source implements BaseSource, Terminus
{
@CompositeTypeField(index = 0)
private String _address;
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java
index 0f81898..e44670d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java
@@ -34,7 +34,7 @@ import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
@CompositeType( symbolicDescriptor = "amqp:target:list", numericDescriptor = 0x0000000000000029L)
-public class Target implements BaseTarget
+public class Target implements BaseTarget, Terminus
{
@CompositeTypeField(index = 0)
private String _address;
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Terminus.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Terminus.java
new file mode 100644
index 0000000..d750daa
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Terminus.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.type.messaging;
+
+import java.util.Map;
+
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+
+public interface Terminus
+{
+ String getAddress();
+
+ TerminusDurability getDurable();
+
+ TerminusExpiryPolicy getExpiryPolicy();
+
+ UnsignedInteger getTimeout();
+
+ Boolean getDynamic();
+
+ Map<Symbol, Object> getDynamicNodeProperties();
+
+ Symbol[] getCapabilities();
+}
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index cfe2320..a0aaa68 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -69,6 +69,7 @@ import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
@@ -542,6 +543,53 @@ public class Session_1_0Test extends UnitTestBase
assertAttachSent(connection2, session2, attach);
}
+ @Test
+ public void testAttachSourceDynamicWithLifeTimePolicyDeleteOnClose()
+ {
+ final Attach attach = createReceiverAttach(getTestName());
+ final Source source = createDynamicSource(new DeleteOnClose());
+ attach.setSource(source);
+
+ _session.receiveAttach(attach);
+
+ assertQueueDurability(getDynamicNodeAddressFromAttachResponse(), false);
+ }
+
+ @Test
+ public void testAttachSourceDynamicWithLifeTimePolicyDeleteOnCloseAndExpiryPolicyNever()
+ {
+ final Attach attach = createReceiverAttach(getTestName());
+ final Source source = createDynamicSource(new DeleteOnClose());
+ source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ attach.setSource(source);
+
+ _session.receiveAttach(attach);
+
+ assertQueueDurability(getDynamicNodeAddressFromAttachResponse(), true);
+ }
+
+ private Source createDynamicSource(final DeleteOnClose lifetimePolicy)
+ {
+ final Source source = new Source();
+ source.setDynamic(true);
+ source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, lifetimePolicy));
+ return source;
+ }
+
+ private String getDynamicNodeAddressFromAttachResponse()
+ {
+ final Attach sentAttach = captureAttach(_connection, _session, 0);
+ assertTrue(sentAttach.getSource() instanceof Source);
+ return ((Source) (sentAttach.getSource())).getAddress();
+ }
+
+ public void assertQueueDurability(final String queueName, final boolean expectedDurability)
+ {
+ final Queue queue = _virtualHost.getChildByName(Queue.class, queueName);
+ assertNotNull("Queue not found", queue);
+ assertEquals("Unexpected durability", queue.isDurable(), expectedDurability);
+ }
+
private void assertFilter(final Attach sentAttach, final String selectorExpression)
{
Source source = (Source)sentAttach.getSource();
@@ -734,7 +782,7 @@ public class Session_1_0Test extends UnitTestBase
final boolean isGlobal,
final boolean isShared)
{
- Attach attach = new Attach();
+ Attach attach = createReceiverAttach(linkName);
Source source = new Source();
List<Symbol> capabilities = new ArrayList<>();
@@ -760,14 +808,20 @@ public class Session_1_0Test extends UnitTestBase
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
}
+ source.setAddress(address);
attach.setSource(source);
+ return attach;
+ }
+
+ private Attach createReceiverAttach(String linkName)
+ {
+ final Attach attach = new Attach();
Target target = new Target();
attach.setTarget(target);
attach.setHandle(new UnsignedInteger(_handle++));
attach.setIncompleteUnsettled(false);
attach.setName(linkName);
attach.setRole(Role.RECEIVER);
- source.setAddress(address);
return attach;
}
@@ -778,7 +832,7 @@ public class Session_1_0Test extends UnitTestBase
private AMQPConnection_1_0 createAmqpConnection_1_0(String containerId)
{
- AMQPConnection_1_0 connection = mock(AMQPConnection_1_0.class);
+ AMQPConnection_1_0 connection = BrokerTestHelper.mockAsSystemPrincipalSource(AMQPConnection_1_0.class);
Subject subject =
new Subject(true, Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
when(connection.getSubject()).thenReturn(subject);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org