You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/12/19 11:28:06 UTC

[qpid-broker-j] branch master updated: QPID-6948: [Broker-J] Fix relationship between model v6.1 session and consumers

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/master by this push:
     new 00f976b  QPID-6948: [Broker-J] Fix relationship between model v6.1 session and consumers
00f976b is described below

commit 00f976b1fcc6db515cd3c0a0e4b22bc3a34edfec
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Sun Dec 16 22:39:35 2018 +0000

    QPID-6948: [Broker-J] Fix relationship between model v6.1 session and consumers
---
 .../java/org/apache/qpid/server/model/Session.java |  7 +++
 .../apache/qpid/server/queue/AbstractQueue.java    |  4 +-
 .../apache/qpid/server/session/AMQPSession.java    |  5 +-
 .../qpid/server/session/AbstractAMQPSession.java   | 16 +++++-
 .../v6_1/category/SessionController.java           | 60 +++++++---------------
 .../v6_1/category/DestinationControllerTest.java   |  1 -
 .../v6_1/category/SessionControllerTest.java       | 26 ++++------
 7 files changed, 56 insertions(+), 63 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index ccb396a..af1084e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.model;
 
+import java.util.Set;
+
 @ManagedObject( creatable = false, amqpName = "org.apache.qpid.Session")
 public interface Session<X extends Session<X>> extends ConfiguredObject<X>
 {
@@ -58,4 +60,9 @@ public interface Session<X extends Session<X>> extends ConfiguredObject<X>
 
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Prefetched")
     long getUnacknowledgedMessages();
+
+    @ManagedOperation(nonModifying = true,
+            changesConfiguredObjectState = false,
+            skipAclCheck = true)
+    Set<? extends Consumer<?, ?>> getConsumers();
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index ba98f1a..84a4700 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1050,7 +1050,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         childAdded(consumer);
         consumer.addChangeListener(_deletedChildListener);
 
-        session.incConsumerCount();
+        session.consumerAdded(consumer);
         addChangeListener(new AbstractConfigurationChangeListener()
         {
             @Override
@@ -1058,7 +1058,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
             {
                 if (child.equals(consumer))
                 {
-                    session.decConsumerCount();
+                    session.consumerRemoved(consumer);
                     removeChangeListener(this);
                 }
             }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
index d68592b..4603e22 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.transport.AMQPConnection;
@@ -60,9 +61,9 @@ public interface AMQPSession<S extends org.apache.qpid.server.session.AMQPSessio
     @Override
     long getConsumerCount();
 
-    void incConsumerCount();
+    void consumerAdded(Consumer<?, X> consumer);
 
-    void decConsumerCount();
+    void consumerRemoved(Consumer<?, X> consumer);
 
     /**
      * Return the time the current transaction started.
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index db69dd9..7256444 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -21,11 +21,13 @@
 package org.apache.qpid.server.session;
 
 import java.security.AccessControlContext;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -47,6 +49,7 @@ import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.State;
@@ -77,6 +80,7 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
 
     protected final Set<AbstractConsumerTarget> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
     private Iterator<AbstractConsumerTarget> _processPendingIterator;
+    private final Set<Consumer<?,X>> _consumers = ConcurrentHashMap.newKeySet();
 
     protected AbstractAMQPSession(final Connection<?> parent, final int sessionId)
     {
@@ -256,15 +260,23 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
     }
 
     @Override
-    public final void incConsumerCount()
+    public final void consumerAdded(Consumer<?, X> consumer)
     {
         _consumerCount.incrementAndGet();
+        _consumers.add(consumer);
     }
 
     @Override
-    public final void decConsumerCount()
+    public final void consumerRemoved(Consumer<?, X> consumer)
     {
         _consumerCount.decrementAndGet();
+        _consumers.remove(consumer);
+    }
+
+    @Override
+    public Set<? extends Consumer<?, ?>> getConsumers()
+    {
+        return Collections.unmodifiableSet(_consumers);
     }
 
     protected abstract void updateBlockedStateIfNecessary();
diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java
index 5177b1e..a279e7c 100644
--- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java
+++ b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java
@@ -20,12 +20,14 @@
  */
 package org.apache.qpid.server.management.plugin.controller.v6_1.category;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
+import java.util.Collections;
 import java.util.Set;
-import java.util.UUID;
+import java.util.stream.Collectors;
 
+import org.apache.qpid.server.management.plugin.ManagementException;
+import org.apache.qpid.server.management.plugin.ManagementResponse;
+import org.apache.qpid.server.management.plugin.ResponseType;
 import org.apache.qpid.server.management.plugin.controller.GenericLegacyConfiguredObject;
 import org.apache.qpid.server.management.plugin.controller.LegacyConfiguredObject;
 import org.apache.qpid.server.management.plugin.controller.LegacyManagementController;
@@ -60,52 +62,28 @@ public class SessionController extends LegacyCategoryController
         }
 
         @Override
-        @SuppressWarnings("unchecked")
         public Collection<LegacyConfiguredObject> getChildren(final String category)
         {
             if (ConsumerController.TYPE.equalsIgnoreCase(category))
             {
                 final LegacyConfiguredObject nextVersionSession = getNextVersionLegacyConfiguredObject();
-                final LegacyConfiguredObject connection =
-                        nextVersionSession.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION);
-                final LegacyConfiguredObject vh = connection.getParent(VirtualHostController.TYPE);
-                final UUID sessionID = (UUID) getAttribute(ID);
-                final UUID connectionID = (UUID) connection.getAttribute(ID);
-                final List<LegacyConfiguredObject> consumers = new ArrayList<>();
-                final Collection<LegacyConfiguredObject> queues = vh.getChildren(QueueController.TYPE);
-                if (queues != null)
+                final ManagementResponse result =
+                        nextVersionSession.invoke("getConsumers", Collections.emptyMap(), true);
+                if (result != null && result.getResponseCode() == 200  && result.getType() == ResponseType.MODEL_OBJECT)
                 {
-                    queues.forEach(q -> {
-                        final Collection<LegacyConfiguredObject> queueConsumers =
-                                q.getChildren(ConsumerController.TYPE);
-                        if (queueConsumers != null)
-                        {
-                            queueConsumers.stream()
-                                          .filter(c -> sameSession(c, sessionID, connectionID))
-                                          .map(c -> getManagementController().convertFromNextVersion(c))
-                                          .forEach(consumers::add);
-                        }
-                    });
+                    final Object objects = result.getBody();
+                    if (objects instanceof Collection)
+                    {
+                        return ((Collection<?>) objects).stream().filter(o -> o instanceof LegacyConfiguredObject)
+                                                        .map(o -> (LegacyConfiguredObject)o)
+                                                        .map(o -> getManagementController().convertFromNextVersion(o))
+                                                        .collect(Collectors.toList());
+                    }
                 }
-                return consumers;
+                throw ManagementException.createInternalServerErrorManagementException(
+                        "Unexpected result of performing operation Session#getConsumers()");
             }
             return super.getChildren(category);
         }
-
-        private boolean sameSession(final LegacyConfiguredObject consumer,
-                                    final UUID sessionID,
-                                    final UUID connectionID)
-        {
-            LegacyConfiguredObject session = (LegacyConfiguredObject) consumer.getAttribute("session");
-            if (session != null)
-            {
-                if (sessionID.equals(session.getAttribute(ID)))
-                {
-                    LegacyConfiguredObject con = session.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION);
-                    return con != null && connectionID.equals(con.getAttribute(ID));
-                }
-            }
-            return false;
-        }
     }
-}
\ No newline at end of file
+}
diff --git a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java
index 4c76ccf..d075217 100644
--- a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java
+++ b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java
@@ -27,7 +27,6 @@ import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockingDetails;
 import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
diff --git a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java
index 77b5924..06cc42b 100644
--- a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java
+++ b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -35,6 +36,9 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.qpid.server.management.plugin.ManagementController;
+import org.apache.qpid.server.management.plugin.ManagementResponse;
+import org.apache.qpid.server.management.plugin.ResponseType;
+import org.apache.qpid.server.management.plugin.controller.ControllerManagementResponse;
 import org.apache.qpid.server.management.plugin.controller.LegacyConfiguredObject;
 import org.apache.qpid.server.management.plugin.controller.LegacyManagementController;
 import org.apache.qpid.test.utils.UnitTestBase;
@@ -58,34 +62,26 @@ public class SessionControllerTest extends UnitTestBase
     public void convertNextVersionLegacyConfiguredObject()
     {
         final UUID sessionID = UUID.randomUUID();
-        final UUID connectionID = UUID.randomUUID();
 
         final LegacyConfiguredObject nextVersionSession = mock(LegacyConfiguredObject.class);
-        final LegacyConfiguredObject nextVersionConnection = mock(LegacyConfiguredObject.class);
-        final LegacyConfiguredObject nextVersionVirtualHost = mock(LegacyConfiguredObject.class);
-        final LegacyConfiguredObject nextVersionQueue = mock(LegacyConfiguredObject.class);
         final LegacyConfiguredObject nextVersionConsumer = mock(LegacyConfiguredObject.class);
 
         when(nextVersionSession.getCategory()).thenReturn(SessionController.TYPE);
-        when(nextVersionSession.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION)).thenReturn(nextVersionConnection);
         when(nextVersionSession.getAttribute(LegacyConfiguredObject.ID)).thenReturn(sessionID);
 
-        when(nextVersionConnection.getParent(VirtualHostController.TYPE)).thenReturn(nextVersionVirtualHost);
-        when(nextVersionConnection.getAttribute(LegacyConfiguredObject.ID)).thenReturn(connectionID);
-
-        when(nextVersionVirtualHost.getChildren(QueueController.TYPE)).thenReturn(Collections.singletonList(nextVersionQueue));
-        when(nextVersionQueue.getChildren(ConsumerController.TYPE)).thenReturn(Collections.singletonList(nextVersionConsumer));
-        when(nextVersionConsumer.getAttribute("session")).thenReturn(nextVersionSession);
+        final ManagementResponse operationResult = new ControllerManagementResponse(ResponseType.MODEL_OBJECT,
+                                                                                    Collections.singletonList(
+                                                                                            nextVersionConsumer));
+        when(nextVersionSession.invoke(eq("getConsumers"), eq(Collections.emptyMap()), eq(true))).thenReturn(
+                operationResult);
 
         final LegacyConfiguredObject convertedConsumer = mock(LegacyConfiguredObject.class);
-        final LegacyConfiguredObject convertedConnection = mock(LegacyConfiguredObject.class);
         when(_legacyManagementController.convertFromNextVersion(nextVersionConsumer)).thenReturn(convertedConsumer);
-        when(_legacyManagementController.convertFromNextVersion(nextVersionConnection)).thenReturn(convertedConnection);
 
-        final LegacyConfiguredObject convertedSession = _sessionController.convertNextVersionLegacyConfiguredObject(nextVersionSession);
+        final LegacyConfiguredObject convertedSession =
+                _sessionController.convertNextVersionLegacyConfiguredObject(nextVersionSession);
 
         assertThat(convertedSession.getAttribute(LegacyConfiguredObject.ID), is(equalTo(sessionID)));
-        assertThat(convertedSession.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION), is(equalTo(convertedConnection)));
 
         final Collection<LegacyConfiguredObject> consumers = convertedSession.getChildren(ConsumerController.TYPE);
         assertThat(consumers, is(notNullValue()));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org