You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/12/19 11:28:06 UTC
[qpid-broker-j] branch master updated: QPID-6948: [Broker-J] Fix
relationship between model v6.1 session and consumers
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/master by this push:
new 00f976b QPID-6948: [Broker-J] Fix relationship between model v6.1 session and consumers
00f976b is described below
commit 00f976b1fcc6db515cd3c0a0e4b22bc3a34edfec
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Sun Dec 16 22:39:35 2018 +0000
QPID-6948: [Broker-J] Fix relationship between model v6.1 session and consumers
---
.../java/org/apache/qpid/server/model/Session.java | 7 +++
.../apache/qpid/server/queue/AbstractQueue.java | 4 +-
.../apache/qpid/server/session/AMQPSession.java | 5 +-
.../qpid/server/session/AbstractAMQPSession.java | 16 +++++-
.../v6_1/category/SessionController.java | 60 +++++++---------------
.../v6_1/category/DestinationControllerTest.java | 1 -
.../v6_1/category/SessionControllerTest.java | 26 ++++------
7 files changed, 56 insertions(+), 63 deletions(-)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index ccb396a..af1084e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.model;
+import java.util.Set;
+
@ManagedObject( creatable = false, amqpName = "org.apache.qpid.Session")
public interface Session<X extends Session<X>> extends ConfiguredObject<X>
{
@@ -58,4 +60,9 @@ public interface Session<X extends Session<X>> extends ConfiguredObject<X>
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Prefetched")
long getUnacknowledgedMessages();
+
+ @ManagedOperation(nonModifying = true,
+ changesConfiguredObjectState = false,
+ skipAclCheck = true)
+ Set<? extends Consumer<?, ?>> getConsumers();
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index ba98f1a..84a4700 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1050,7 +1050,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
childAdded(consumer);
consumer.addChangeListener(_deletedChildListener);
- session.incConsumerCount();
+ session.consumerAdded(consumer);
addChangeListener(new AbstractConfigurationChangeListener()
{
@Override
@@ -1058,7 +1058,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
if (child.equals(consumer))
{
- session.decConsumerCount();
+ session.consumerRemoved(consumer);
removeChangeListener(this);
}
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
index d68592b..4603e22 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.transport.AMQPConnection;
@@ -60,9 +61,9 @@ public interface AMQPSession<S extends org.apache.qpid.server.session.AMQPSessio
@Override
long getConsumerCount();
- void incConsumerCount();
+ void consumerAdded(Consumer<?, X> consumer);
- void decConsumerCount();
+ void consumerRemoved(Consumer<?, X> consumer);
/**
* Return the time the current transaction started.
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index db69dd9..7256444 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -21,11 +21,13 @@
package org.apache.qpid.server.session;
import java.security.AccessControlContext;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
@@ -47,6 +49,7 @@ import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
@@ -77,6 +80,7 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
protected final Set<AbstractConsumerTarget> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
private Iterator<AbstractConsumerTarget> _processPendingIterator;
+ private final Set<Consumer<?,X>> _consumers = ConcurrentHashMap.newKeySet();
protected AbstractAMQPSession(final Connection<?> parent, final int sessionId)
{
@@ -256,15 +260,23 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
}
@Override
- public final void incConsumerCount()
+ public final void consumerAdded(Consumer<?, X> consumer)
{
_consumerCount.incrementAndGet();
+ _consumers.add(consumer);
}
@Override
- public final void decConsumerCount()
+ public final void consumerRemoved(Consumer<?, X> consumer)
{
_consumerCount.decrementAndGet();
+ _consumers.remove(consumer);
+ }
+
+ @Override
+ public Set<? extends Consumer<?, ?>> getConsumers()
+ {
+ return Collections.unmodifiableSet(_consumers);
}
protected abstract void updateBlockedStateIfNecessary();
diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java
index 5177b1e..a279e7c 100644
--- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java
+++ b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java
@@ -20,12 +20,14 @@
*/
package org.apache.qpid.server.management.plugin.controller.v6_1.category;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
+import java.util.Collections;
import java.util.Set;
-import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.qpid.server.management.plugin.ManagementException;
+import org.apache.qpid.server.management.plugin.ManagementResponse;
+import org.apache.qpid.server.management.plugin.ResponseType;
import org.apache.qpid.server.management.plugin.controller.GenericLegacyConfiguredObject;
import org.apache.qpid.server.management.plugin.controller.LegacyConfiguredObject;
import org.apache.qpid.server.management.plugin.controller.LegacyManagementController;
@@ -60,52 +62,28 @@ public class SessionController extends LegacyCategoryController
}
@Override
- @SuppressWarnings("unchecked")
public Collection<LegacyConfiguredObject> getChildren(final String category)
{
if (ConsumerController.TYPE.equalsIgnoreCase(category))
{
final LegacyConfiguredObject nextVersionSession = getNextVersionLegacyConfiguredObject();
- final LegacyConfiguredObject connection =
- nextVersionSession.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION);
- final LegacyConfiguredObject vh = connection.getParent(VirtualHostController.TYPE);
- final UUID sessionID = (UUID) getAttribute(ID);
- final UUID connectionID = (UUID) connection.getAttribute(ID);
- final List<LegacyConfiguredObject> consumers = new ArrayList<>();
- final Collection<LegacyConfiguredObject> queues = vh.getChildren(QueueController.TYPE);
- if (queues != null)
+ final ManagementResponse result =
+ nextVersionSession.invoke("getConsumers", Collections.emptyMap(), true);
+ if (result != null && result.getResponseCode() == 200 && result.getType() == ResponseType.MODEL_OBJECT)
{
- queues.forEach(q -> {
- final Collection<LegacyConfiguredObject> queueConsumers =
- q.getChildren(ConsumerController.TYPE);
- if (queueConsumers != null)
- {
- queueConsumers.stream()
- .filter(c -> sameSession(c, sessionID, connectionID))
- .map(c -> getManagementController().convertFromNextVersion(c))
- .forEach(consumers::add);
- }
- });
+ final Object objects = result.getBody();
+ if (objects instanceof Collection)
+ {
+ return ((Collection<?>) objects).stream().filter(o -> o instanceof LegacyConfiguredObject)
+ .map(o -> (LegacyConfiguredObject)o)
+ .map(o -> getManagementController().convertFromNextVersion(o))
+ .collect(Collectors.toList());
+ }
}
- return consumers;
+ throw ManagementException.createInternalServerErrorManagementException(
+ "Unexpected result of performing operation Session#getConsumers()");
}
return super.getChildren(category);
}
-
- private boolean sameSession(final LegacyConfiguredObject consumer,
- final UUID sessionID,
- final UUID connectionID)
- {
- LegacyConfiguredObject session = (LegacyConfiguredObject) consumer.getAttribute("session");
- if (session != null)
- {
- if (sessionID.equals(session.getAttribute(ID)))
- {
- LegacyConfiguredObject con = session.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION);
- return con != null && connectionID.equals(con.getAttribute(ID));
- }
- }
- return false;
- }
}
-}
\ No newline at end of file
+}
diff --git a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java
index 4c76ccf..d075217 100644
--- a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java
+++ b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java
@@ -27,7 +27,6 @@ import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockingDetails;
import static org.mockito.Mockito.when;
import java.util.Arrays;
diff --git a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java
index 77b5924..06cc42b 100644
--- a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java
+++ b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -35,6 +36,9 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.management.plugin.ManagementController;
+import org.apache.qpid.server.management.plugin.ManagementResponse;
+import org.apache.qpid.server.management.plugin.ResponseType;
+import org.apache.qpid.server.management.plugin.controller.ControllerManagementResponse;
import org.apache.qpid.server.management.plugin.controller.LegacyConfiguredObject;
import org.apache.qpid.server.management.plugin.controller.LegacyManagementController;
import org.apache.qpid.test.utils.UnitTestBase;
@@ -58,34 +62,26 @@ public class SessionControllerTest extends UnitTestBase
public void convertNextVersionLegacyConfiguredObject()
{
final UUID sessionID = UUID.randomUUID();
- final UUID connectionID = UUID.randomUUID();
final LegacyConfiguredObject nextVersionSession = mock(LegacyConfiguredObject.class);
- final LegacyConfiguredObject nextVersionConnection = mock(LegacyConfiguredObject.class);
- final LegacyConfiguredObject nextVersionVirtualHost = mock(LegacyConfiguredObject.class);
- final LegacyConfiguredObject nextVersionQueue = mock(LegacyConfiguredObject.class);
final LegacyConfiguredObject nextVersionConsumer = mock(LegacyConfiguredObject.class);
when(nextVersionSession.getCategory()).thenReturn(SessionController.TYPE);
- when(nextVersionSession.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION)).thenReturn(nextVersionConnection);
when(nextVersionSession.getAttribute(LegacyConfiguredObject.ID)).thenReturn(sessionID);
- when(nextVersionConnection.getParent(VirtualHostController.TYPE)).thenReturn(nextVersionVirtualHost);
- when(nextVersionConnection.getAttribute(LegacyConfiguredObject.ID)).thenReturn(connectionID);
-
- when(nextVersionVirtualHost.getChildren(QueueController.TYPE)).thenReturn(Collections.singletonList(nextVersionQueue));
- when(nextVersionQueue.getChildren(ConsumerController.TYPE)).thenReturn(Collections.singletonList(nextVersionConsumer));
- when(nextVersionConsumer.getAttribute("session")).thenReturn(nextVersionSession);
+ final ManagementResponse operationResult = new ControllerManagementResponse(ResponseType.MODEL_OBJECT,
+ Collections.singletonList(
+ nextVersionConsumer));
+ when(nextVersionSession.invoke(eq("getConsumers"), eq(Collections.emptyMap()), eq(true))).thenReturn(
+ operationResult);
final LegacyConfiguredObject convertedConsumer = mock(LegacyConfiguredObject.class);
- final LegacyConfiguredObject convertedConnection = mock(LegacyConfiguredObject.class);
when(_legacyManagementController.convertFromNextVersion(nextVersionConsumer)).thenReturn(convertedConsumer);
- when(_legacyManagementController.convertFromNextVersion(nextVersionConnection)).thenReturn(convertedConnection);
- final LegacyConfiguredObject convertedSession = _sessionController.convertNextVersionLegacyConfiguredObject(nextVersionSession);
+ final LegacyConfiguredObject convertedSession =
+ _sessionController.convertNextVersionLegacyConfiguredObject(nextVersionSession);
assertThat(convertedSession.getAttribute(LegacyConfiguredObject.ID), is(equalTo(sessionID)));
- assertThat(convertedSession.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION), is(equalTo(convertedConnection)));
final Collection<LegacyConfiguredObject> consumers = convertedSession.getChildren(ConsumerController.TYPE);
assertThat(consumers, is(notNullValue()));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org