You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/05 17:37:23 UTC

svn commit: r1564825 - in /qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server: message/ plugin/ virtualhost/

Author: rgodfrey
Date: Wed Feb  5 16:37:22 2014
New Revision: 1564825

URL: http://svn.apache.org/r1564825
Log:
Allow plugins to define system nodes

Added:
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageNode.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java
Modified:
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java?rev=1564825&r1=1564824&r2=1564825&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java Wed Feb  5 16:37:22 2014
@@ -23,7 +23,7 @@ package org.apache.qpid.server.message;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 
-public interface MessageDestination
+public interface MessageDestination extends MessageNode
 {
 
     public String getName();

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageNode.java?rev=1564825&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageNode.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageNode.java Wed Feb  5 16:37:22 2014
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+public interface MessageNode
+{
+    String getName();
+}

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1564825&r1=1564824&r2=1564825&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Wed Feb  5 16:37:22 2014
@@ -32,7 +32,7 @@ import org.apache.qpid.server.store.Tran
 import java.util.Collection;
 import java.util.EnumSet;
 
-public interface MessageSource extends TransactionLogResource
+public interface MessageSource extends TransactionLogResource, MessageNode
 {
     Consumer addConsumer(ConsumerTarget target, FilterManager filters,
                          Class<? extends ServerMessage> messageClass,

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java?rev=1564825&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java Wed Feb  5 16:37:22 2014
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.plugin;
+
+import org.apache.qpid.server.message.MessageNode;
+
+public interface SystemNodeCreator extends Pluggable
+{
+    interface SystemNodeRegistry
+    {
+        void registerSystemNode(MessageNode node);
+        void removeSystemNode(MessageNode node);
+    }
+
+    void register(SystemNodeRegistry registry);
+}

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1564825&r1=1564824&r2=1564825&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Wed Feb  5 16:37:22 2014
@@ -48,9 +48,12 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.VirtualHostMessages;
 import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageNode;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.plugin.SystemNodeCreator;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
@@ -101,6 +104,7 @@ public abstract class AbstractVirtualHos
 
     private final DtxRegistry _dtxRegistry;
     private final AMQQueueFactory _queueFactory;
+    private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry();
 
     private volatile State _state = State.INITIALISING;
 
@@ -109,6 +113,13 @@ public abstract class AbstractVirtualHos
     private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
     private boolean _blocked;
 
+    private final Map<String, MessageDestination> _systemNodeDestinations =
+            Collections.synchronizedMap(new HashMap<String,MessageDestination>());
+
+    private final Map<String, MessageSource> _systemNodeSources =
+            Collections.synchronizedMap(new HashMap<String,MessageSource>());
+
+
     public AbstractVirtualHost(VirtualHostRegistry virtualHostRegistry,
                                StatisticsGatherer brokerStatisticsGatherer,
                                SecurityManager parentSecurityManager,
@@ -151,6 +162,8 @@ public abstract class AbstractVirtualHos
 
         _exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry);
 
+        registerSystemNodes();
+
         initialiseStatistics();
 
         initialiseStorage(hostConfig, virtualHost);
@@ -159,6 +172,16 @@ public abstract class AbstractVirtualHos
         getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
     }
 
+    private void registerSystemNodes()
+    {
+        QpidServiceLoader<SystemNodeCreator> qpidServiceLoader = new QpidServiceLoader<SystemNodeCreator>();
+        Iterable<SystemNodeCreator> factories = qpidServiceLoader.instancesOf(SystemNodeCreator.class);
+        for(SystemNodeCreator creator : factories)
+        {
+            creator.register(_systemNodeRegistry);
+        }
+    }
+
     abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig,
                                               org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception;
 
@@ -445,7 +468,8 @@ public abstract class AbstractVirtualHos
     @Override
     public MessageSource getMessageSource(final String name)
     {
-        return getQueue(name);
+        MessageSource systemSource = _systemNodeSources.get(name);
+        return systemSource == null ? getQueue(name) : systemSource;
     }
 
     @Override
@@ -536,7 +560,8 @@ public abstract class AbstractVirtualHos
     @Override
     public MessageDestination getMessageDestination(final String name)
     {
-        return getExchange(name);
+        MessageDestination destination = _systemNodeDestinations.get(name);
+        return destination == null ? getExchange(name) : destination;
     }
 
     @Override
@@ -942,4 +967,32 @@ public abstract class AbstractVirtualHos
             }
         }
     }
+
+    private class SystemNodeRegistry implements SystemNodeCreator.SystemNodeRegistry
+    {
+        @Override
+        public void registerSystemNode(final MessageNode node)
+        {
+            if(node instanceof MessageDestination)
+            {
+                _systemNodeDestinations.put(node.getName(), (MessageDestination) node);
+            }
+            if(node instanceof MessageSource)
+            {
+                _systemNodeSources.put(node.getName(), (MessageSource)node);
+            }
+        }
+
+        public void removeSystemNode(final MessageNode node)
+        {
+            if(node instanceof MessageDestination)
+            {
+                _systemNodeDestinations.remove(node.getName());
+            }
+            if(node instanceof MessageSource)
+            {
+                _systemNodeSources.remove(node.getName());
+            }
+        }
+    }
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1564825&r1=1564824&r2=1564825&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Wed Feb  5 16:37:22 2014
@@ -31,6 +31,7 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageNode;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.protocol.LinkRegistry;
@@ -80,6 +81,7 @@ public interface VirtualHost extends Dur
     void removeExchange(Exchange exchange, boolean force) throws AMQException;
 
     MessageDestination getMessageDestination(String name);
+
     Exchange getExchange(String name);
     Exchange getExchange(UUID id);
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org