You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/11/24 00:20:25 UTC

[qpid-broker-j] branch 7.1.x updated (99237c7 -> cdde19c)

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a change to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git.


    from 99237c7  QPID-8373: Modify test to use synchrnous publishing and preconfigured receive interval
     new 8a31ba3  QPID-8381: [Broker-J] Upgrade Oracle BDB JE dependency to version 7.5.11
     new f1742d3  QPID-8378: [Broker-J] Make sure that message reference is released for deleted node
     new a7d5b79  QPID-8377 : Allow configuration of behaviour with unknown exchange declare arguments
     new cdde19c  QPID-8377 : add test to verify IGNORE behaviour

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/qpid/server/model/Exchange.java     | 11 +++++
 .../apache/qpid/server/queue/AbstractQueue.java    |  8 +---
 .../qpid/server/queue/AbstractQueueTestBase.java   | 53 ++++++++++++++++++++++
 .../protocol/v0_10/ServerSessionDelegate.java      | 24 ++++++++--
 .../qpid/server/protocol/v0_8/AMQChannel.java      | 22 +++++++--
 pom.xml                                            |  2 +-
 ...ExchangeDeclareInvalidOptionBehaviourTest.java} | 50 ++++----------------
 7 files changed, 113 insertions(+), 57 deletions(-)
 copy systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/exchange/{ExchangeTest.java => ExchangeDeclareInvalidOptionBehaviourTest.java} (51%)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 03/04: QPID-8377 : Allow configuration of behaviour with unknown exchange declare arguments

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit a7d5b793bf35d1b3638c694753aa3ed76aa0838c
Author: Robert Godfrey <rg...@apache.org>
AuthorDate: Sun Nov 17 23:12:30 2019 +0100

    QPID-8377 : Allow configuration of behaviour with unknown exchange declare arguments
    
    (cherry picked from commit 03b751e1467f7482c825f6c5f09a89bb0157b057)
---
 .../org/apache/qpid/server/model/Exchange.java     | 11 ++++++++++
 .../protocol/v0_10/ServerSessionDelegate.java      | 24 +++++++++++++++++-----
 .../qpid/server/protocol/v0_8/AMQChannel.java      | 22 ++++++++++++++++----
 3 files changed, 48 insertions(+), 9 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
index eb54c75..800f57c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
@@ -52,6 +52,17 @@ public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, Me
         REJECT, DISCARD
     }
 
+    enum BehaviourOnUnknownDeclareArgument
+    {
+        IGNORE, LOG, FAIL
+    }
+
+    String UNKNOWN_EXCHANGE_DECLARE_ARGUMENT_BEHAVIOUR_NAME = "exchange.behaviourOnUnknownDeclareArgument";
+    @ManagedContextDefault(name= UNKNOWN_EXCHANGE_DECLARE_ARGUMENT_BEHAVIOUR_NAME)
+    BehaviourOnUnknownDeclareArgument
+            ON_UNKNOWN_EXCHANGE_DECLARE_OPTION = BehaviourOnUnknownDeclareArgument.FAIL;
+
+
     // Attributes
 
     @ManagedAttribute(description = "Provides an alternate destination that, depending on behaviour requested by the "
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index d97f1c6..06c1690 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -938,7 +938,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                         attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_BINDING,
                                        Collections.singletonMap(AlternateBinding.DESTINATION, alternateExchangeName));
                     }
-                    validateExchangeDeclareArguments(attributes, session.getAMQPConnection().getModel());
+                    validateAndSanitizeExchangeDeclareArguments(attributes, session.getAMQPConnection());
                     addressSpace.createMessageDestination(Exchange.class, attributes);;
                 }
                 catch(ReservedExchangeNameException e)
@@ -997,8 +997,9 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
         }
     }
 
-    private void validateExchangeDeclareArguments(final Map<String, Object> attributes, final Model model)
+    private void validateAndSanitizeExchangeDeclareArguments(final Map<String, Object> attributes, final AMQPConnection_0_10 connection)
     {
+        final Model model = connection.getModel();
         final ConfiguredObjectTypeRegistry typeRegistry = model.getTypeRegistry();
         final List<ConfiguredObjectAttribute<?, ?>> types = new ArrayList<>(typeRegistry.getAttributeTypes(Exchange.class).values());
         typeRegistry.getTypeSpecialisations(Exchange.class).forEach(type -> types.addAll(typeRegistry.getTypeSpecificAttributes(type)));
@@ -1007,11 +1008,24 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                                                   .filter(name -> types.stream().noneMatch(a -> Objects.equals(name, a.getName())
                                                                                                 && !a.isDerived()))
                                                   .collect(Collectors.toSet());
-
         if (!unsupported.isEmpty())
         {
-            throw new IllegalArgumentException(String.format(
-                    "Unsupported exchange declare arguments : %s", String.join(",", unsupported)));
+            Exchange.BehaviourOnUnknownDeclareArgument unknownArgumentBehaviour =
+                    connection.getContextValue(Exchange.BehaviourOnUnknownDeclareArgument.class,
+                                               Exchange.UNKNOWN_EXCHANGE_DECLARE_ARGUMENT_BEHAVIOUR_NAME);
+            switch(unknownArgumentBehaviour)
+            {
+                case LOG:
+                    LOGGER.warn("Unsupported exchange declare arguments : {}", String.join(",", unsupported));
+                    // fall through
+                case IGNORE:
+                    attributes.keySet().removeAll(unsupported);
+                    break;
+                case FAIL:
+                default:
+                    throw new IllegalArgumentException(String.format(
+                            "Unsupported exchange declare arguments : %s", String.join(",", unsupported)));
+            }
         }
     }
 
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 1a1789e..c783687 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -2641,7 +2641,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                         attributes.put(Exchange.ALTERNATE_BINDING,
                                        Collections.singletonMap(AlternateBinding.DESTINATION, alternateExchangeName));
                     }
-                    validateExchangeDeclareArguments(attributes);
+                    validateAndSanitizeExchangeDeclareArguments(attributes);
                     exchange = virtualHost.createMessageDestination(Exchange.class, attributes);
 
                     if (!nowait)
@@ -2718,7 +2718,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
 
     }
 
-    private void validateExchangeDeclareArguments(final Map<String, Object> attributes)
+    private void validateAndSanitizeExchangeDeclareArguments(final Map<String, Object> attributes)
     {
         final ConfiguredObjectTypeRegistry typeRegistry = getModel().getTypeRegistry();
         final List<ConfiguredObjectAttribute<?, ?>> types = new ArrayList<>(typeRegistry.getAttributeTypes(Exchange.class).values());
@@ -2731,8 +2731,22 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
 
         if (!unsupported.isEmpty())
         {
-            throw new IllegalArgumentException(String.format(
-                    "Unsupported exchange declare arguments : %s", String.join(",", unsupported)));
+            Exchange.BehaviourOnUnknownDeclareArgument unknownArgumentBehaviour =
+                    getConnection().getContextValue(Exchange.BehaviourOnUnknownDeclareArgument.class,
+                                                    Exchange.UNKNOWN_EXCHANGE_DECLARE_ARGUMENT_BEHAVIOUR_NAME);
+            switch(unknownArgumentBehaviour)
+            {
+                case LOG:
+                    LOGGER.warn("Unsupported exchange declare arguments : {}", String.join(",", unsupported));
+                    // fall through
+                case IGNORE:
+                    attributes.keySet().removeAll(unsupported);
+                    break;
+                case FAIL:
+                default:
+                    throw new IllegalArgumentException(String.format(
+                        "Unsupported exchange declare arguments : %s", String.join(",", unsupported)));
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 01/04: QPID-8381: [Broker-J] Upgrade Oracle BDB JE dependency to version 7.5.11

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 8a31ba33cc913417efdf18dfaf2d3df2409b9180
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Sat Nov 23 23:35:57 2019 +0000

    QPID-8381: [Broker-J] Upgrade Oracle BDB JE dependency to version 7.5.11
    
    (cherry picked from commit d5ae00b256914fb8af0ead9a2362f9476c59218c)
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 7717184..a655c9f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,7 +101,7 @@
     <dollar.sign>$</dollar.sign>
     <at.sign>@</at.sign>
 
-    <bdb-version>7.4.5</bdb-version>
+    <bdb-version>7.5.11</bdb-version>
     <derby-version>10.13.1.1</derby-version>
     <logback-version>1.2.3</logback-version>
     <guava-version>27.0-jre</guava-version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 02/04: QPID-8378: [Broker-J] Make sure that message reference is released for deleted node

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit f1742d3e9a0c58720a333be944f91be5f68ae0ce
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Nov 14 23:13:46 2019 +0000

    QPID-8378: [Broker-J] Make sure that message reference is released for deleted node
    
    (cherry picked from commit f17f16aded492953984ece42d90afd52ab44408a)
---
 .../apache/qpid/server/queue/AbstractQueue.java    |  8 +---
 .../qpid/server/queue/AbstractQueueTestBase.java   | 53 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 6 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 22e42d4..cc6608d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1714,15 +1714,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         {
             QueueEntry node = queueListIterator.getNode();
             MessageReference reference = node.newMessageReference();
-            if(reference != null && !node.isDeleted())
+            if(reference != null)
             {
                 try
                 {
-                    if (!reference.getMessage().checkValid())
-                    {
-                        malformedEntry(node);
-                    }
-                    else if (visitor.visit(node))
+                    if (!node.isDeleted() && reference.getMessage().checkValid() && visitor.visit(node))
                     {
                         break;
                     }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 4fcea19..0a4f987 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -34,6 +34,7 @@ import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
@@ -1257,6 +1258,58 @@ abstract class AbstractQueueTestBase extends UnitTestBase
                      _queue.getQueueDepthMessages());
     }
 
+    @Test
+    public void testVisit()
+    {
+        final ServerMessage message = createMessage(1L, 2, 3);
+        _queue.enqueue(message, null, null);
+
+        final QueueEntryVisitor visitor = mock(QueueEntryVisitor.class);
+        _queue.visit(visitor);
+
+        final ArgumentCaptor<QueueEntry> argument = ArgumentCaptor.forClass(QueueEntry.class);
+        verify(visitor).visit(argument.capture());
+
+        final QueueEntry queueEntry = argument.getValue();
+        assertEquals(message, queueEntry.getMessage());
+        verify(message.newReference()).release();
+    }
+
+    @Test
+    public void testVisitWhenNodeDeletedAfterAdvance()
+    {
+        final QueueEntryList list = mock(QueueEntryList.class);
+
+        final Map<String,Object> attributes = new HashMap<>();
+        attributes.put(Queue.NAME, _qname);
+        attributes.put(Queue.OWNER, _owner);
+
+        @SuppressWarnings("unchecked")
+        final Queue queue = new AbstractQueue(attributes, _virtualHost)
+        {
+            @Override
+            QueueEntryList getEntries()
+            {
+                return list;
+            }
+
+        };
+
+        final MessageReference reference = mock(MessageReference.class);
+        final QueueEntry entry = mock(QueueEntry.class);
+        when(entry.isDeleted()).thenReturn(true);
+        when(entry.newMessageReference()).thenReturn(reference);
+        final QueueEntryIterator iterator = mock(QueueEntryIterator.class);
+        when(iterator.advance()).thenReturn(true, false);
+        when(iterator.getNode()).thenReturn(entry);
+        when(list.iterator()).thenReturn(iterator);
+
+        final QueueEntryVisitor visitor = mock(QueueEntryVisitor.class);
+        queue.visit(visitor);
+        verifyNoMoreInteractions(visitor);
+        verify(reference).release();
+    }
+
     private void makeVirtualHostTargetSizeExceeded()
     {
         final InternalMessage message = InternalMessage.createMessage(_virtualHost.getMessageStore(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 04/04: QPID-8377 : add test to verify IGNORE behaviour

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit cdde19cafecca628ff9d12063eb28d362431235b
Author: Robert Godfrey <rg...@apache.org>
AuthorDate: Tue Nov 19 11:57:11 2019 +0100

    QPID-8377 : add test to verify IGNORE behaviour
    
    (cherry picked from commit 8a2a02521a76f889e23d89f67ff025e73b578df5)
---
 .../ExchangeDeclareInvalidOptionBehaviourTest.java | 68 ++++++++++++++++++++++
 1 file changed, 68 insertions(+)

diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/exchange/ExchangeDeclareInvalidOptionBehaviourTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/exchange/ExchangeDeclareInvalidOptionBehaviourTest.java
new file mode 100644
index 0000000..71de695
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/exchange/ExchangeDeclareInvalidOptionBehaviourTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.exchange;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.ErrorCodes;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeBoundOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "exchange.behaviourOnUnknownDeclareArgument", value = "IGNORE")
+public class ExchangeDeclareInvalidOptionBehaviourTest extends BrokerAdminUsingTestBase
+{
+    private static final String TEST_EXCHANGE = "testExchange";
+
+    @Test
+    public void exchangeDeclareInvalidWireArguments() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP)).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel()
+                       .open()
+                       .consumeResponse(ChannelOpenOkBody.class)
+                       .exchange()
+                       .declareName(TEST_EXCHANGE)
+                       .declareArguments(Collections.singletonMap("foo", "bar"))
+                       .declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class);
+
+
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org