You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by va...@apache.org on 2023/02/03 07:28:14 UTC

[qpid-broker-j] branch main updated: QPID-8618: [Broker-J] ACL check on link stealing (#169)

This is an automated email from the ASF dual-hosted git repository.

vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new 1bc1974012 QPID-8618: [Broker-J] ACL check on link stealing (#169)
1bc1974012 is described below

commit 1bc1974012e11204dea12f3f675785bb61797537
Author: Daniil Kirilyuk <da...@gmail.com>
AuthorDate: Fri Feb 3 08:28:08 2023 +0100

    QPID-8618: [Broker-J] ACL check on link stealing (#169)
---
 .../apache/qpid/server/protocol/v1_0/LinkImpl.java |  16 +-
 .../qpid/server/protocol/v1_0/LinkImplTest.java    | 235 +++++++++++++++++++++
 2 files changed, 250 insertions(+), 1 deletion(-)

diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
index 692faa6f97..cabcf9b133 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
@@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v1_0;
 
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
@@ -34,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.protocol.LinkModel;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
@@ -47,6 +49,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.security.access.Operation;
 import org.apache.qpid.server.util.Action;
 
 public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Link_1_0<S, T>
@@ -94,7 +97,18 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin
 
             if (_linkEndpoint != null && !session.equals(_linkEndpoint.getSession()))
             {
-                SettableFuture<LinkEndpoint<S, T>> future = SettableFuture.create();
+                if (!Objects.equals(_linkEndpoint.getSession().getConnection().getPrincipal(),
+                        session.getConnection().getPrincipal()))
+                {
+                    final Operation operation = attach.getRole() == Role.SENDER
+                            ? Operation.PERFORM_ACTION("publish")
+                            : Operation.PERFORM_ACTION("consume");
+                    final ConfiguredObject<?> targetObject = _linkEndpoint instanceof SendingLinkEndpoint
+                            ? (ConfiguredObject<?>) ((SendingLinkEndpoint) _linkEndpoint).getDestination().getMessageSource()
+                            : (ConfiguredObject<?>) session.getReceivingDestination(this, (Target) getTarget()).getMessageDestination();
+                    targetObject.authorise(operation);
+                }
+                final SettableFuture<LinkEndpoint<S, T>> future = SettableFuture.create();
                 _thiefQueue.add(new ThiefInformation(session, attach, future));
                 startLinkStealingIfNecessary();
                 return future;
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkImplTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkImplTest.java
new file mode 100644
index 0000000000..96cd1d3db7
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkImplTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
+import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.security.access.Operation;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transport.AMQPConnection;
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LinkImplTest
+{
+    private final static String REMOTE_CONTAINER_ID = "remote-container-id";
+    private final static String LINK_NAME = "link-name";
+
+    private LinkRegistry<Source, Target> _linkRegistry;
+
+    @Before
+    public void setUp()
+    {
+        _linkRegistry = mock(LinkRegistry.class);
+    }
+
+    @Test
+    public void linkStealing_PublishToQueue() throws Exception
+    {
+        final Queue<?> queue = mock(Queue.class);
+        final Session_1_0 session1 = createSession("principal1", queue);
+        final Session_1_0 session2 = createSession("principal2", queue);
+
+        final Attach attach1 = createAttach(Role.SENDER);
+        final Attach attach2 = createAttach(Role.SENDER);
+
+        final LinkImpl<Source, Target> link = new LinkImpl<>(REMOTE_CONTAINER_ID, LINK_NAME, Role.RECEIVER, _linkRegistry);
+
+        final ListenableFuture<? extends LinkEndpoint<?, ?>> future = link.attach(session1, attach1);
+        final LinkEndpoint<?, ?> linkEndpoint = future.get();
+        assertTrue(linkEndpoint instanceof StandardReceivingLinkEndpoint);
+
+        link.attach(session2, attach2);
+        verify(queue, times(1)).authorise(eq(Operation.PERFORM_ACTION("publish")));
+    }
+
+    @Test
+    public void linkStealing_ConsumeQueue() throws Exception
+    {
+        final Queue<?> queue = mock(Queue.class);
+        final Session_1_0 session1 = createSession("principal1", queue);
+        final Session_1_0 session2 = createSession("principal2", queue);
+
+        final Attach attach1 = createAttach(Role.RECEIVER);
+        final Attach attach2 = createAttach(Role.RECEIVER);
+
+        final LinkImpl<Source, Target> link = new LinkImpl<>(REMOTE_CONTAINER_ID, LINK_NAME, Role.SENDER, _linkRegistry);
+
+        final ListenableFuture<? extends LinkEndpoint<?, ?>> future = link.attach(session1, attach1);
+        final LinkEndpoint<?, ?> linkEndpoint = future.get();
+        assertTrue(linkEndpoint instanceof SendingLinkEndpoint);
+
+        link.attach(session2, attach2);
+        verify(queue, times(1)).authorise(eq(Operation.PERFORM_ACTION("consume")));
+    }
+
+    @Test
+    public void linkStealing_PublishToExchange() throws Exception
+    {
+        final Exchange<?> exchange = mock(Exchange.class);
+        final Queue<?> queue = mock(Queue.class);
+        final Session_1_0 session1 = createSession("principal1", exchange, queue);
+        final Session_1_0 session2 = createSession("principal2", exchange, queue);
+
+        final Attach attach1 = createAttach(Role.SENDER);
+        final Attach attach2 = createAttach(Role.SENDER);
+
+        final LinkImpl<Source, Target> link = new LinkImpl<>(REMOTE_CONTAINER_ID, LINK_NAME, Role.RECEIVER, _linkRegistry);
+
+        final ListenableFuture<? extends LinkEndpoint<?, ?>> future = link.attach(session1, attach1);
+        final LinkEndpoint<?, ?> linkEndpoint = future.get();
+        assertTrue(linkEndpoint instanceof StandardReceivingLinkEndpoint);
+
+        link.attach(session2, attach2);
+        verify(exchange, times(1)).authorise(eq(Operation.PERFORM_ACTION("publish")));
+    }
+
+    @Test
+    public void linkStealing_ConsumeExchange() throws Exception
+    {
+        final Exchange<?> exchange = mock(Exchange.class);
+        final Queue<?> queue = mock(Queue.class);
+        final Session_1_0 session1 = createSession("principal1", exchange, queue);
+        final Session_1_0 session2 = createSession("principal2", exchange, queue);
+
+        final Attach attach1 = createAttach(Role.RECEIVER);
+        final Attach attach2 = createAttach(Role.RECEIVER);
+
+        final LinkImpl<Source, Target> link = new LinkImpl<>(REMOTE_CONTAINER_ID, LINK_NAME, Role.SENDER, _linkRegistry);
+
+        final ListenableFuture<? extends LinkEndpoint<?, ?>> future = link.attach(session1, attach1);
+        final LinkEndpoint<?, ?> linkEndpoint = future.get();
+        assertTrue(linkEndpoint instanceof SendingLinkEndpoint);
+
+        link.attach(session2, attach2);
+        verify(queue, times(1)).authorise(eq(Operation.PERFORM_ACTION("consume")));
+    }
+
+    private Session_1_0 createSession(final String principal,
+                                      final MessageDestination messageDestination) throws Exception
+    {
+        return createSession(principal, messageDestination, null);
+    }
+
+    private Session_1_0 createSession(final String principal,
+                                      final MessageDestination messageDestination,
+                                      final MessageDestination backup) throws Exception
+    {
+        final MessageStore messageStore = mock(MessageStore.class);
+
+        final NamedAddressSpace addressSpace = mock(NamedAddressSpace.class);
+        when(addressSpace.getMessageStore()).thenReturn(messageStore);
+
+        final SectionDecoderRegistry sectionDecoderRegistry = mock(SectionDecoderRegistry.class);
+
+        final AMQPDescribedTypeRegistry amqpDescribedTypeRegistry = mock(AMQPDescribedTypeRegistry.class);
+        when(amqpDescribedTypeRegistry.getSectionDecoderRegistry()).thenReturn(sectionDecoderRegistry);
+
+        final AMQPConnection<?> amqpConnection = mock(AMQPConnection.class);
+        when(amqpConnection.getContextValue(eq(Long.class), eq(Consumer.SUSPEND_NOTIFICATION_PERIOD)))
+                .thenReturn(10_000L);
+
+        final AMQPConnection_1_0<?> connection = mock(AMQPConnection_1_0.class);
+        when(connection.getDescribedTypeRegistry()).thenReturn(amqpDescribedTypeRegistry);
+        when(connection.getPrincipal()).thenReturn(principal);
+        when(connection.getAddressSpace()).thenReturn(addressSpace);
+
+        final DeliveryRegistry deliveryRegistry = mock(DeliveryRegistry.class);
+
+        final ReceivingDestination receivingDestination = mock(ReceivingDestination.class);
+        when(receivingDestination.getCapabilities()).thenReturn(new Symbol[] { });
+        when(receivingDestination.getMessageDestination()).thenReturn(messageDestination);
+
+        final SendingDestination sendingDestination = messageDestination instanceof Exchange
+                ? mock(ExchangeSendingDestination.class)
+                : mock(StandardSendingDestination.class);
+        when(sendingDestination.getCapabilities()).thenReturn(new Symbol[] { });
+
+        final MessageInstanceConsumer consumer = mock(MessageInstanceConsumer.class);
+
+        if (messageDestination instanceof Exchange)
+        {
+            final Queue<?> queue = (Queue<?>) backup;
+            when(queue.addConsumer(any(),any(), eq(Message_1_0.class), any(), any(), any()))
+                    .thenReturn(consumer);
+
+            when(sendingDestination.getMessageSource()).thenReturn(queue);
+        }
+
+        if (messageDestination instanceof Queue)
+        {
+            final Queue<?> queue = (Queue<?>) messageDestination;
+            when(queue.addConsumer(any(),any(), eq(Message_1_0.class), any(), any(), any()))
+                    .thenReturn(consumer);
+            when(sendingDestination.getMessageSource()).thenReturn(queue);
+        }
+
+        final Session_1_0 session = mock(Session_1_0.class);
+        doReturn(connection).when(session).getConnection();
+        doReturn(amqpConnection).when(session).getAMQPConnection();
+        when(session.getIncomingDeliveryRegistry()).thenReturn(deliveryRegistry);
+        when(session.getOutgoingDeliveryRegistry()).thenReturn(deliveryRegistry);
+        when(session.getReceivingDestination(any(LinkImpl.class), any(Target.class))).thenReturn(receivingDestination);
+        when(session.getSendingDestination(any(LinkImpl.class), any(Source.class))).thenReturn(sendingDestination);
+
+        return session;
+    }
+
+    private Attach createAttach(final Role role)
+    {
+        final Source source = mock(Source.class);
+        when(source.getDistributionMode()).thenReturn(StdDistMode.COPY);
+
+        final Target target = mock(Target.class);
+
+        final Attach attach = mock(Attach.class);
+        when(attach.getRole()).thenReturn(role);
+        when(attach.getSource()).thenReturn(source);
+        when(attach.getTarget()).thenReturn(target);
+        when(attach.getInitialDeliveryCount()).thenReturn(UnsignedInteger.ZERO);
+
+        return attach;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org