You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/07/29 10:08:39 UTC

[qpid-broker-j] 03/04: QPID-8343: [Broker-J][AMQP 1.0] Derive dynamic node durability from terminus expiry policy

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 656edb576f20f6d983b611b9f8d6024bbb8c8ea4
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed Jul 24 18:03:05 2019 +0100

    QPID-8343: [Broker-J][AMQP 1.0] Derive dynamic node durability from terminus expiry policy
    
    (cherry picked from commit bf75f490170beb91f72547c00c365bf024cbaa1f)
---
 .../apache/qpid/server/model/BrokerTestHelper.java |  4 ++
 .../qpid/server/protocol/v1_0/Session_1_0.java     | 43 ++++++++--------
 .../protocol/v1_0/type/messaging/Source.java       |  2 +-
 .../protocol/v1_0/type/messaging/Target.java       |  2 +-
 .../protocol/v1_0/type/messaging/Terminus.java     | 43 ++++++++++++++++
 .../qpid/server/protocol/v1_0/Session_1_0Test.java | 60 ++++++++++++++++++++--
 6 files changed, 129 insertions(+), 25 deletions(-)

diff --git a/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java b/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java
index a8d594c..fb36677 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java
@@ -338,4 +338,8 @@ public class BrokerTestHelper
         return mock;
     }
 
+    public static <X extends ConfiguredObject> X mockAsSystemPrincipalSource(Class<X> clazz)
+    {
+        return mockWithSystemPrincipal(clazz, SYSTEM_PRINCIPAL);
+    }
 }
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 931cf95..1300b4d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -28,7 +28,6 @@ import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.text.MessageFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -89,6 +88,8 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoMessages;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Terminus;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
@@ -660,7 +661,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         {
             if (Boolean.TRUE.equals(target.getDynamic()))
             {
-                MessageDestination tempDestination = createDynamicDestination(link, target.getDynamicNodeProperties(), target.getCapabilities());
+                MessageDestination tempDestination = createDynamicDestination(link, target);
                 if(tempDestination != null)
                 {
                     target.setAddress(_primaryDomain + tempDestination.getName());
@@ -739,10 +740,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
         if (Boolean.TRUE.equals(source.getDynamic()))
         {
-            final Set<Symbol> sourceCapabilities = source.getCapabilities() == null
-                    ? Collections.emptySet()
-                    : new HashSet<>(Arrays.asList(source.getCapabilities()));
-            MessageSource tempSource = createDynamicSource(link, source.getDynamicNodeProperties(), sourceCapabilities);
+            MessageSource tempSource = createDynamicSource(link, source);
             if(tempSource != null)
             {
                 source.setAddress(_primaryDomain + tempSource.getName());
@@ -817,21 +815,23 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
     }
 
     private MessageSource createDynamicSource(final Link_1_0<?, ?> link,
-                                              Map properties,
-                                              final Set<Symbol> capabilities) throws AmqpErrorException
+                                              final Terminus terminus) throws AmqpErrorException
     {
         // TODO temporary topics?
         final String queueName = "TempQueue" + UUID.randomUUID().toString();
         try
         {
-            Map<String, Object> attributes = convertDynamicNodePropertiesToAttributes(link, properties, queueName);
+            final Map<String, Object> attributes = createDynamicNodeAttributes(link, terminus, queueName);
 
-            if (capabilities.contains(Symbol.valueOf("temporary-queue"))
-                || capabilities.contains(Symbol.valueOf("temporary-topic")))
+            if (terminus.getCapabilities() != null)
             {
-                attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION);
+                final Set<Symbol> capabilities = Sets.newHashSet(terminus.getCapabilities());
+                if (capabilities.contains(Symbol.valueOf("temporary-queue"))
+                    || capabilities.contains(Symbol.valueOf("temporary-topic")))
+                {
+                    attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION);
+                }
             }
-
             return Subject.doAs(getSubjectWithAddedSystemRights(),
                                 (PrivilegedAction<MessageSource>) () -> getAddressSpace().createMessageSource(MessageSource.class, attributes));
         }
@@ -848,15 +848,15 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
 
     private MessageDestination createDynamicDestination(final Link_1_0<?, ?> link,
-                                                        Map properties,
-                                                        final Symbol[] capabilities) throws AmqpErrorException
+                                                        final Terminus terminus) throws AmqpErrorException
     {
+        final Symbol[] capabilities = terminus.getCapabilities();
         final Set<Symbol> capabilitySet = capabilities == null ? Collections.emptySet() : Sets.newHashSet(capabilities);
         boolean isTopic = capabilitySet.contains(Symbol.valueOf("temporary-topic")) || capabilitySet.contains(Symbol.valueOf("topic"));
         final String destName = (isTopic ? "TempTopic" : "TempQueue") + UUID.randomUUID().toString();
         try
         {
-            Map<String, Object> attributes = convertDynamicNodePropertiesToAttributes(link, properties, destName);
+            final Map<String, Object> attributes = createDynamicNodeAttributes(link, terminus, destName);
 
 
             Class<? extends MessageDestination> clazz = isTopic ? Exchange.class : MessageDestination.class;
@@ -883,11 +883,14 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         }
     }
 
-    private Map<String, Object> convertDynamicNodePropertiesToAttributes(final Link_1_0<?, ?> link,
-                                                                         final Map properties,
-                                                                         final String nodeName)
+    private Map<String, Object> createDynamicNodeAttributes(final Link_1_0<?, ?> link,
+                                                            final Terminus terminus,
+                                                            final String nodeName)
     {
         // TODO convert AMQP 1-0 node properties to queue attributes
+
+        final Map<Symbol, Object> properties = terminus.getDynamicNodeProperties();
+        final TerminusExpiryPolicy expiryPolicy = terminus.getExpiryPolicy();
         LifetimePolicy lifetimePolicy = properties == null
                                         ? null
                                         : (LifetimePolicy) properties.get(LIFETIME_POLICY);
@@ -895,7 +898,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         Map<String,Object> attributes = new HashMap<>();
         attributes.put(ConfiguredObject.ID, UUID.randomUUID());
         attributes.put(ConfiguredObject.NAME, nodeName);
-        attributes.put(ConfiguredObject.DURABLE, true);
+        attributes.put(ConfiguredObject.DURABLE, TerminusExpiryPolicy.NEVER.equals(expiryPolicy));
 
         if(lifetimePolicy instanceof DeleteOnNoLinks)
         {
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java
index 47e67d8..547958c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java
@@ -37,7 +37,7 @@ import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 
 @CompositeType( symbolicDescriptor = "amqp:source:list", numericDescriptor = 0x0000000000000028L)
-public class Source implements BaseSource
+public class Source implements BaseSource, Terminus
 {
     @CompositeTypeField(index = 0)
     private String _address;
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java
index 0f81898..e44670d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java
@@ -34,7 +34,7 @@ import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 
 @CompositeType( symbolicDescriptor = "amqp:target:list", numericDescriptor = 0x0000000000000029L)
-public class Target implements BaseTarget
+public class Target implements BaseTarget, Terminus
 {
     @CompositeTypeField(index = 0)
     private String _address;
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Terminus.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Terminus.java
new file mode 100644
index 0000000..d750daa
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Terminus.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.type.messaging;
+
+import java.util.Map;
+
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+
+public interface Terminus
+{
+    String getAddress();
+
+    TerminusDurability getDurable();
+
+    TerminusExpiryPolicy getExpiryPolicy();
+
+    UnsignedInteger getTimeout();
+
+    Boolean getDynamic();
+
+    Map<Symbol, Object> getDynamicNodeProperties();
+
+    Symbol[] getCapabilities();
+}
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index cfe2320..a0aaa68 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -69,6 +69,7 @@ import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
@@ -542,6 +543,53 @@ public class Session_1_0Test extends UnitTestBase
         assertAttachSent(connection2, session2, attach);
     }
 
+    @Test
+    public void testAttachSourceDynamicWithLifeTimePolicyDeleteOnClose()
+    {
+        final Attach attach = createReceiverAttach(getTestName());
+        final Source source = createDynamicSource(new DeleteOnClose());
+        attach.setSource(source);
+
+        _session.receiveAttach(attach);
+
+        assertQueueDurability(getDynamicNodeAddressFromAttachResponse(), false);
+    }
+
+    @Test
+    public void testAttachSourceDynamicWithLifeTimePolicyDeleteOnCloseAndExpiryPolicyNever()
+    {
+        final Attach attach = createReceiverAttach(getTestName());
+        final Source source = createDynamicSource(new DeleteOnClose());
+        source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+        attach.setSource(source);
+
+        _session.receiveAttach(attach);
+
+        assertQueueDurability(getDynamicNodeAddressFromAttachResponse(), true);
+    }
+
+    private Source createDynamicSource(final DeleteOnClose lifetimePolicy)
+    {
+        final Source source = new Source();
+        source.setDynamic(true);
+        source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, lifetimePolicy));
+        return source;
+    }
+
+    private String getDynamicNodeAddressFromAttachResponse()
+    {
+        final Attach sentAttach = captureAttach(_connection, _session, 0);
+        assertTrue(sentAttach.getSource() instanceof Source);
+        return ((Source) (sentAttach.getSource())).getAddress();
+    }
+
+    public void assertQueueDurability(final String queueName, final boolean expectedDurability)
+    {
+        final Queue queue = _virtualHost.getChildByName(Queue.class, queueName);
+        assertNotNull("Queue not found", queue);
+        assertEquals("Unexpected durability", queue.isDurable(), expectedDurability);
+    }
+
     private void assertFilter(final Attach sentAttach, final String selectorExpression)
     {
         Source source = (Source)sentAttach.getSource();
@@ -734,7 +782,7 @@ public class Session_1_0Test extends UnitTestBase
                                 final boolean isGlobal,
                                 final boolean isShared)
     {
-        Attach attach = new Attach();
+        Attach attach = createReceiverAttach(linkName);
         Source source = new Source();
 
         List<Symbol> capabilities = new ArrayList<>();
@@ -760,14 +808,20 @@ public class Session_1_0Test extends UnitTestBase
             source.setDurable(TerminusDurability.NONE);
             source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
         }
+        source.setAddress(address);
         attach.setSource(source);
+        return attach;
+    }
+
+    private Attach createReceiverAttach(String linkName)
+    {
+        final Attach attach = new Attach();
         Target target = new Target();
         attach.setTarget(target);
         attach.setHandle(new UnsignedInteger(_handle++));
         attach.setIncompleteUnsettled(false);
         attach.setName(linkName);
         attach.setRole(Role.RECEIVER);
-        source.setAddress(address);
         return attach;
     }
 
@@ -778,7 +832,7 @@ public class Session_1_0Test extends UnitTestBase
 
     private AMQPConnection_1_0 createAmqpConnection_1_0(String containerId)
     {
-        AMQPConnection_1_0 connection = mock(AMQPConnection_1_0.class);
+        AMQPConnection_1_0 connection = BrokerTestHelper.mockAsSystemPrincipalSource(AMQPConnection_1_0.class);
         Subject subject =
                 new Subject(true, Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
         when(connection.getSubject()).thenReturn(subject);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org