You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by fa...@apache.org on 2013/04/26 16:25:53 UTC
svn commit: r1476219 - in /qpid/trunk/qpid:
java/broker/src/main/java/org/apache/qpid/server/model/adapter/
tools/src/java/src/qpid-broker-plugins-management-qmf2/java/org/apache/qpid/server/qmf2/
Author: fadams
Date: Fri Apr 26 14:25:52 2013
New Revision: 1476219
URL: http://svn.apache.org/r1476219
Log:
QPID-4760: Associate Java Broker QueueAdapter and SessionAdapter via ConsumerAdapter
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
qpid/trunk/qpid/tools/src/java/src/qpid-broker-plugins-management-qmf2/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1476219&r1=1476218&r2=1476219&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Fri Apr 26 14:25:52 2013
@@ -68,20 +68,37 @@ final class ConnectionAdapter extends Ab
{
if(!actualSessions.contains(session))
{
- _sessionAdapters.remove(session);
+ SessionAdapter adapter = _sessionAdapters.remove(session);
+ childRemoved(adapter); // Trigger corresponding ConfigurationChangeListener childRemoved() callback.
}
}
for(AMQSessionModel session : actualSessions)
{
if(!_sessionAdapters.containsKey(session))
{
- _sessionAdapters.put(session, new SessionAdapter(session, getTaskExecutor()));
+ SessionAdapter adapter = new SessionAdapter(session, getTaskExecutor());
+ _sessionAdapters.put(session, adapter);
+ childAdded(adapter); // Trigger corresponding ConfigurationChangeListener childAdded() callback.
}
}
return new ArrayList<Session>(_sessionAdapters.values());
}
}
+ /**
+ * Retrieve the SessionAdapter instance keyed by the AMQSessionModel from this Connection.
+ * @param session the AMQSessionModel used to index the SessionAdapter.
+ * @return the requested SessionAdapter.
+ */
+ SessionAdapter getSessionAdapter(AMQSessionModel session)
+ {
+ synchronized (_sessionAdapters)
+ {
+ getSessions(); // Call getSessions() first to ensure _sessionAdapters state is up to date with actualSessions.
+ return _sessionAdapters.get(session);
+ }
+ }
+
public void delete()
{
try
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java?rev=1476219&r1=1476218&r2=1476219&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java Fri Apr 26 14:25:52 2013
@@ -37,9 +37,11 @@ public class ConsumerAdapter extends Abs
{
private final Subscription _subscription;
private final QueueAdapter _queue;
+ private final SessionAdapter _session;
private final ConsumerStatistics _statistics;
- public ConsumerAdapter(final QueueAdapter queueAdapter, final Subscription subscription)
+ public ConsumerAdapter(final QueueAdapter queueAdapter, final SessionAdapter sessionAdapter,
+ final Subscription subscription)
{
super(UUIDGenerator.generateConsumerUUID(queueAdapter.getVirtualHost().getName(),
queueAdapter.getName(),
@@ -48,6 +50,7 @@ public class ConsumerAdapter extends Abs
subscription.getConsumerName()), queueAdapter.getTaskExecutor());
_subscription = subscription;
_queue = queueAdapter;
+ _session = sessionAdapter;
_statistics = new ConsumerStatistics();
//TODO
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java?rev=1476219&r1=1476218&r2=1476219&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java Fri Apr 26 14:25:52 2013
@@ -34,13 +34,17 @@ import org.apache.qpid.server.binding.Bi
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFinder;
import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
+import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.util.MapValueConverter;
@@ -91,6 +95,38 @@ final class QueueAdapter extends Abstrac
_queue.setNotificationListener(this);
}
+ /**
+ * Helper method to retrieve the SessionAdapter keyed by the AMQSessionModel.
+ * This method first finds the ConnectionAdapter associated with the Session from this QueueAdapter's parent vhost
+ * then it does a lookup on that to find the SessionAdapter keyed by the requested AMQSessionModel instance.
+ * @param session the AMQSessionModel used to index the SessionAdapter.
+ * @return the requested SessionAdapter or null if it can't be found.
+ */
+ private SessionAdapter getSessionAdapter(AMQSessionModel session)
+ {
+ // Retrieve the ConnectionModel associated with the SessionModel as a key to lookup the ConnectionAdapter.
+ AMQConnectionModel connectionKey = session.getConnectionModel();
+
+ // Lookup the ConnectionAdapter, from which we should be able to retrieve the SessionAdapter we really want.
+ ConnectionAdapter connectionAdapter = _vhost.getConnectionAdapter(connectionKey);
+ if (connectionAdapter == null)
+ {
+ return null; // If we can't find an associated ConnectionAdapter the SessionAdapter is a lost cause.
+ }
+ else
+ { // With a good ConnectionAdapter we can finally try to find the SessionAdapter we are actually looking for.
+ SessionAdapter sessionAdapter = connectionAdapter.getSessionAdapter(session);
+ if (sessionAdapter == null)
+ {
+ return null; // If the SessionAdapter isn't associated with the selected ConnectionAdapter give up.
+ }
+ else
+ {
+ return sessionAdapter;
+ }
+ }
+ }
+
private void populateConsumers()
{
Collection<org.apache.qpid.server.subscription.Subscription> actualSubscriptions = _queue.getConsumers();
@@ -102,7 +138,13 @@ final class QueueAdapter extends Abstrac
{
if(!_consumerAdapters.containsKey(subscription))
{
- _consumerAdapters.put(subscription, new ConsumerAdapter(this, subscription));
+ SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
+ _consumerAdapters.put(subscription, adapter);
+ if (sessionAdapter != null)
+ { // Register ConsumerAdapter with the SessionAdapter.
+ sessionAdapter.subscriptionRegistered(subscription, adapter);
+ }
}
}
}
@@ -571,9 +613,13 @@ final class QueueAdapter extends Abstrac
{
if(!_consumerAdapters.containsKey(subscription))
{
- adapter = new ConsumerAdapter(this, subscription);
- _consumerAdapters.put(subscription,adapter);
- // TODO - register with session
+ SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
+ _consumerAdapters.put(subscription, adapter);
+ if (sessionAdapter != null)
+ { // Register ConsumerAdapter with the SessionAdapter.
+ sessionAdapter.subscriptionRegistered(subscription, adapter);
+ }
}
}
if(adapter != null)
@@ -589,10 +635,14 @@ final class QueueAdapter extends Abstrac
synchronized (_consumerAdapters)
{
adapter = _consumerAdapters.remove(subscription);
- // TODO - register with session
}
if(adapter != null)
{
+ SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ if (sessionAdapter != null)
+ { // Unregister ConsumerAdapter with the SessionAdapter.
+ sessionAdapter.subscriptionUnregistered(subscription);
+ }
childRemoved(adapter);
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1476219&r1=1476218&r2=1476219&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Fri Apr 26 14:25:52 2013
@@ -21,8 +21,10 @@
package org.apache.qpid.server.model.adapter;
import java.security.AccessControlException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -34,6 +36,7 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -44,6 +47,7 @@ final class SessionAdapter extends Abstr
private AMQSessionModel _session;
private SessionStatistics _statistics;
+ private Map<Subscription, ConsumerAdapter> _consumerAdapters = new HashMap<Subscription, ConsumerAdapter>();
public SessionAdapter(final AMQSessionModel session, TaskExecutor taskExecutor)
{
@@ -54,7 +58,10 @@ final class SessionAdapter extends Abstr
public Collection<Consumer> getSubscriptions()
{
- return null; //TODO
+ synchronized (_consumerAdapters)
+ {
+ return new ArrayList<Consumer>(_consumerAdapters.values());
+ }
}
public Collection<Publisher> getPublishers()
@@ -111,6 +118,37 @@ final class SessionAdapter extends Abstr
return 0; //TODO
}
+ /**
+ * Register a ConsumerAdapter (Subscription) with this Session keyed by the Subscription.
+ * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter.
+ * @param adapter the registered ConsumerAdapter.
+ */
+ void subscriptionRegistered(Subscription subscription, ConsumerAdapter adapter)
+ {
+ synchronized (_consumerAdapters)
+ {
+ _consumerAdapters.put(subscription, adapter);
+ }
+ childAdded(adapter);
+ }
+
+ /**
+ * Unregister a ConsumerAdapter (Subscription) with this Session keyed by the Subscription.
+ * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter.
+ */
+ void subscriptionUnregistered(Subscription subscription)
+ {
+ ConsumerAdapter adapter = null;
+ synchronized (_consumerAdapters)
+ {
+ adapter = _consumerAdapters.remove(subscription);
+ }
+ if (adapter != null)
+ {
+ childRemoved(adapter);
+ }
+ }
+
@Override
public Collection<String> getAttributeNames()
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1476219&r1=1476218&r2=1476219&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Fri Apr 26 14:25:52 2013
@@ -214,6 +214,19 @@ public final class VirtualHostAdapter ex
}
+ /**
+ * Retrieve the ConnectionAdapter instance keyed by the AMQConnectionModel from this VirtualHost.
+ * @param connection the AMQConnectionModel used to index the ConnectionAdapter.
+ * @return the requested ConnectionAdapter.
+ */
+ ConnectionAdapter getConnectionAdapter(AMQConnectionModel connection)
+ {
+ synchronized (_connectionAdapters)
+ {
+ return _connectionAdapters.get(connection);
+ }
+ }
+
public Collection<Queue> getQueues()
{
synchronized(_queueAdapters)
@@ -644,6 +657,10 @@ public final class VirtualHostAdapter ex
if(adapter != null)
{
+ // Call getSessions() first to ensure that any SessionAdapter children are cleanly removed and any
+ // corresponding ConfigurationChangeListener childRemoved() callback is called for child SessionAdapters.
+ adapter.getSessions();
+
childRemoved(adapter);
}
}
Modified: qpid/trunk/qpid/tools/src/java/src/qpid-broker-plugins-management-qmf2/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/java/src/qpid-broker-plugins-management-qmf2/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java?rev=1476219&r1=1476218&r2=1476219&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/java/src/qpid-broker-plugins-management-qmf2/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java (original)
+++ qpid/trunk/qpid/tools/src/java/src/qpid-broker-plugins-management-qmf2/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java Fri Apr 26 14:25:52 2013
@@ -169,6 +169,8 @@ public class QmfManagementAgent implemen
_agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Subscription.getSubscribeSchema());
_agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Subscription.getUnsubscribeSchema());
+ _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Session.getSchema());
+
// Initialise QmfAgentData Objects and track changes to the broker Management Objects.
registerConfigurationChangeListeners();
}
@@ -176,10 +178,12 @@ public class QmfManagementAgent implemen
catch (QmfException qmfe)
{
_log.info("QmfException {} caught in QmfManagementAgent Constructor", qmfe.getMessage());
+ _agent = null; // Causes isConnected() to be false and thus prevents the "QMF2 Management Ready" message.
}
catch (Exception e)
{
_log.info("Exception {} caught in QmfManagementAgent Constructor", e.getMessage());
+ _agent = null; // Causes isConnected() to be false and thus prevents the "QMF2 Management Ready" message.
}
}
@@ -214,7 +218,7 @@ public class QmfManagementAgent implemen
for (VirtualHost vhost : _broker.getVirtualHosts())
{
// We don't add QmfAgentData VirtualHost objects. Possibly TODO, but it's a bit awkward at the moment
- // becase (as of Qpid 0.20) the C++ broker doesn't *seem* to do much with them and the command line
+ // because (as of Qpid 0.20) the C++ broker doesn't *seem* to do much with them and the command line
// tools such as qpid-config don't appear to be VirtualHost aware. A way to stay compatible is to
// mark queues, exchanges etc with [vhost:<vhost-name>/]<object-name> (see Constructor comments).
vhost.addChangeListener(this);
@@ -227,7 +231,6 @@ public class QmfManagementAgent implemen
{
childAdded(connection, session);
- // session.getSubscriptions() returns null in Qpid 0.23 TODO fix that.
if (session.getSubscriptions() != null)
{
for (Consumer subscription : session.getSubscriptions())
@@ -330,7 +333,7 @@ public class QmfManagementAgent implemen
* QMF2 Management Object if one doesn't already exist. In most cases it's a one-to-one mapping, but for
* Binding for example the Binding child is added to both Queue and Exchange so we only create the Binding
* QMF2 Management Object once and add the queueRef and exchangeRef reference properties referencing the Queue
- * and Exchange parent Objects respectively.
+ * and Exchange parent Objects respectively, Similarly for Consumer (AKA Subscription).
* <p>
* This method is also responsible for raising the appropriate QMF2 Events when Management Objects are created.
* @param object the parent object that the child is being added to.
@@ -362,8 +365,16 @@ public class QmfManagementAgent implemen
agentConnection = false; // Only ignore the first Connection, which is the one from the Agent.
}
else if (child instanceof Session)
- { // TODO
-
+ {
+ if (!_objects.containsKey(child))
+ {
+ QmfAgentData ref = _objects.get(object); // Get the Connection QmfAgentData so we can get connectionRef.
+ if (ref != null)
+ {
+ data = new org.apache.qpid.server.qmf2.agentdata.Session((Session)child, ref.getObjectId());
+ _objects.put(child, data);
+ }
+ }
}
else if (child instanceof Exchange)
{
@@ -449,17 +460,15 @@ public class QmfManagementAgent implemen
{
subscription.setQueueRef(ref.getObjectId(), (Queue)object);
// Raise a Subscribe Event - N.B. Need to do it *after* we've set the queueRef.
- _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Subscription)data).createSubscribeEvent());
+ _agent.raiseEvent(subscription.createSubscribeEvent());
}
- else if (object instanceof Session) // Won't get called in Qpid 0.20.
- { // TODO the association between Session and Subscription isn't implemented in the 0.20 Java Broker.
- //System.out.println("subscription.setSessionRef");
+ else if (object instanceof Session)
+ {
subscription.setSessionRef(ref.getObjectId());
}
}
}
-
try
{
// If we've created new QmfAgentData we register it with the Agent.
@@ -503,8 +512,8 @@ public class QmfManagementAgent implemen
_agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Connection)data).createClientDisconnectEvent());
}
else if (child instanceof Session)
- { // TODO
-
+ {
+ // no-op, don't need to do anything specific when Session is removed.
}
else if (child instanceof Exchange)
{
@@ -588,5 +597,4 @@ public class QmfManagementAgent implemen
}
}
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org