You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/12/15 21:46:11 UTC

svn commit: r1720243 - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/virtualhost/ broker-core/src/main...

Author: rgodfrey
Date: Tue Dec 15 20:46:10 2015
New Revision: 1720243

URL: http://svn.apache.org/viewvc?rev=1720243&view=rev
Log:
QPID-6954 : Add the ability to define "policies" for node auto-creation based on address

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NodeAutoCreationPolicy.java   (with props)
Modified:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
    qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/Hello.java
    qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java?rev=1720243&r1=1720242&r2=1720243&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java Tue Dec 15 20:46:10 2015
@@ -103,6 +103,9 @@ public class BDBHAReplicaVirtualHostImpl
     @ManagedAttributeField
     private List<String> _globalAddressDomains;
 
+    @ManagedAttributeField
+    private List<NodeAutoCreationPolicy> _nodeAutoCreationPolicies;
+
     @ManagedObjectFactoryConstructor
     public BDBHAReplicaVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
     {
@@ -239,6 +242,12 @@ public class BDBHAReplicaVirtualHostImpl
     }
 
     @Override
+    public List<NodeAutoCreationPolicy> getNodeAutoCreationPolicies()
+    {
+        return Collections.emptyList();
+    }
+
+    @Override
     public int getConnectionThreadPoolSize()
     {
         return 0;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1720243&r1=1720242&r2=1720243&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Tue Dec 15 20:46:10 2015
@@ -44,6 +44,7 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.virtualhost.HouseKeepingTask;
+import org.apache.qpid.server.virtualhost.NodeAutoCreationPolicy;
 import org.apache.qpid.server.virtualhost.VirtualHostConnectionListener;
 
 @ManagedObject( defaultType = "ProvidedStore", description = VirtualHost.CLASS_DESCRIPTION)
@@ -74,6 +75,7 @@ public interface VirtualHost<X extends V
     String NUMBER_OF_SELECTORS                  = "numberOfSelectors";
     String CONNECTION_THREAD_POOL_SIZE          = "connectionThreadPoolSize";
     String CONNECTION_THREAD_POOL_KEEP_ALIVE_TIMEOUT = "connectionThreadPoolKeepAliveTimeout";
+    String NODE_AUTO_CREATION_POLICIES = "nodeAutoCreationPolicies";
 
     @ManagedContextDefault( name = VIRTUALHOST_WORK_DIR_VAR)
     public static final String VIRTUALHOST_WORK_DIR = VIRTUALHOST_WORK_DIR_VAR_EXPRESSION;
@@ -125,6 +127,9 @@ public interface VirtualHost<X extends V
     @ManagedAttribute( defaultValue = "${virtualhost.housekeepingThreadCount}")
     int getHousekeepingThreadCount();
 
+    @ManagedAttribute( defaultValue = "[]" )
+    List<NodeAutoCreationPolicy> getNodeAutoCreationPolicies();
+
     String VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE = "virtualhost.connectionThreadPool.size";
     @SuppressWarnings("unused")
     @ManagedContextDefault( name = VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1720243&r1=1720242&r2=1720243&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Tue Dec 15 20:46:10 2015
@@ -42,6 +42,8 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
 
 import javax.security.auth.Subject;
 
@@ -193,6 +195,9 @@ public abstract class AbstractVirtualHos
     @ManagedAttributeField
     private List<String> _globalAddressDomains;
 
+    @ManagedAttributeField
+    private List<NodeAutoCreationPolicy> _nodeAutoCreationPolicies;
+
     private boolean _useAsyncRecoverer;
 
     private MessageDestination _defaultDestination;
@@ -255,6 +260,13 @@ public abstract class AbstractVirtualHos
                 validateGlobalAddressDomain(domain);
             }
         }
+        if(getNodeAutoCreationPolicies() != null)
+        {
+            for(NodeAutoCreationPolicy policy : getNodeAutoCreationPolicies())
+            {
+                validateNodeAutoCreationPolicy(policy);
+            }
+        }
 
         validateConnectionThreadPoolSettings(this);
     }
@@ -276,6 +288,17 @@ public abstract class AbstractVirtualHos
                 }
             }
         }
+        if(changedAttributes.contains(NODE_AUTO_CREATION_POLICIES))
+        {
+            if(getNodeAutoCreationPolicies() != null)
+            {
+                for(NodeAutoCreationPolicy policy : getNodeAutoCreationPolicies())
+                {
+                    validateNodeAutoCreationPolicy(policy);
+                }
+            }
+
+        }
 
         if (changedAttributes.contains(CONNECTION_THREAD_POOL_SIZE) || changedAttributes.contains(NUMBER_OF_SELECTORS))
         {
@@ -283,6 +306,58 @@ public abstract class AbstractVirtualHos
         }
     }
 
+    private void validateNodeAutoCreationPolicy(final NodeAutoCreationPolicy policy)
+    {
+        String pattern = policy.getPattern();
+        if(pattern == null)
+        {
+            throw new IllegalArgumentException("The 'pattern' attribute of a NodeAutoCreationPattern MUST be supplied");
+        }
+
+        try
+        {
+            Pattern.compile(pattern);
+        }
+        catch (PatternSyntaxException e)
+        {
+            throw new IllegalArgumentException("The 'pattern' attribute of a NodeAutoCreationPattern MUST be a valid "
+                                               + "Java Regular Expression Pattern, the value '" + pattern + "' is not");
+
+        }
+
+        String nodeType = policy.getNodeType();
+        Class<? extends ConfiguredObject> sourceClass = null;
+        for (Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
+        {
+            if (childClass.getSimpleName().equalsIgnoreCase(nodeType.trim()))
+            {
+                sourceClass = childClass;
+                break;
+            }
+        }
+        if(sourceClass == null)
+        {
+            throw new IllegalArgumentException("The node type of a NodeAutoCreationPattern must be a valid child type "
+                                               + "of a VirtualHost, '" + nodeType + "' is not.");
+        }
+        if(policy.isCreatedOnConsume() && !MessageSource.class.isAssignableFrom(sourceClass))
+        {
+            throw new IllegalArgumentException("A NodeAutoCreationPattern which creates nodes on consume must have a "
+                                               + "nodeType which implements MessageSource, '" + nodeType + "' does not.");
+        }
+
+        if(policy.isCreatedOnPublish() && !MessageDestination.class.isAssignableFrom(sourceClass))
+        {
+            throw new IllegalArgumentException("A NodeAutoCreationPattern which creates nodes on publish must have a "
+                                               + "nodeType which implements MessageDestination, '" + nodeType + "' does not.");
+        }
+        if(!(policy.isCreatedOnConsume() || policy.isCreatedOnPublish()))
+        {
+            throw new IllegalArgumentException("A NodeAutoCreationPattern must create on consume, create on publish or both.");
+        }
+
+    }
+
     private void validateGlobalAddressDomain(final String name)
     {
         String regex = "/(/?)([\\w_\\-:.\\$]+/)*[\\w_\\-:.\\$]+";
@@ -661,6 +736,12 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
+    public List<NodeAutoCreationPolicy> getNodeAutoCreationPolicies()
+    {
+        return _nodeAutoCreationPolicies;
+    }
+
+    @Override
     public Queue<?> getAttainedQueue(String name)
     {
         Queue<?> child = awaitChildClassToAttainState(Queue.class, name);
@@ -684,10 +765,110 @@ public abstract class AbstractVirtualHos
     @Override
     public MessageSource getAttainedMessageSource(final String name)
     {
-        MessageSource systemSource = _systemNodeSources.get(name);
-        return systemSource == null ? (MessageSource) awaitChildClassToAttainState(Queue.class, name) : systemSource;
+        MessageSource messageSource = _systemNodeSources.get(name);
+        if(messageSource == null)
+        {
+            messageSource = awaitChildClassToAttainState(Queue.class, name);
+        }
+        if(messageSource == null)
+        {
+            messageSource = autoCreateSource(name);
+        }
+        return messageSource;
     }
 
+    private MessageSource autoCreateSource(final String name)
+    {
+        for(NodeAutoCreationPolicy policy : getNodeAutoCreationPolicies())
+        {
+            String pattern = policy.getPattern();
+            if(name.matches(pattern) && policy.isCreatedOnConsume())
+            {
+                String nodeType = policy.getNodeType();
+                Class<? extends ConfiguredObject> sourceClass = null;
+                for(Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
+                {
+                    if(childClass.getSimpleName().equalsIgnoreCase(nodeType.trim()) && MessageSource.class.isAssignableFrom(childClass))
+                    {
+                        sourceClass = childClass;
+                    }
+                }
+                if(sourceClass != null)
+                {
+                    Map<String, Object> attributes = new HashMap<>(policy.getAttributes());
+                    attributes.remove(ConfiguredObject.ID);
+                    attributes.put(ConfiguredObject.NAME, name);
+
+                    try
+                    {
+
+                        final MessageSource messageSource =
+                                (MessageSource) doSync(addChildAsync(sourceClass, attributes));
+                        if (messageSource != null)
+                        {
+                            return messageSource;
+                        }
+                    }
+                    catch (RuntimeException e)
+                    {
+                        _logger.info("Unable to auto create a node named {} due to exception", name, e);
+                    }
+
+                }
+            }
+
+        }
+        return null;
+    }
+
+
+    private MessageDestination autoCreateDestination(final String name)
+    {
+        for (NodeAutoCreationPolicy policy : getNodeAutoCreationPolicies())
+        {
+            String pattern = policy.getPattern();
+            if (name.matches(pattern) && policy.isCreatedOnPublish())
+            {
+                String nodeType = policy.getNodeType();
+                Class<? extends ConfiguredObject> sourceClass = null;
+                for (Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
+                {
+                    if (childClass.getSimpleName().equalsIgnoreCase(nodeType.trim())
+                        && MessageDestination.class.isAssignableFrom(childClass))
+                    {
+                        sourceClass = childClass;
+                    }
+                }
+                if (sourceClass != null)
+                {
+                    Map<String, Object> attributes = new HashMap<>(policy.getAttributes());
+                    attributes.remove(ConfiguredObject.ID);
+                    attributes.put(ConfiguredObject.NAME, name);
+
+                    try
+                    {
+
+                        final MessageDestination messageDestination =
+                                (MessageDestination) doSync(addChildAsync(sourceClass, attributes));
+                        if (messageDestination != null)
+                        {
+                            return messageDestination;
+                        }
+                    }
+                    catch (RuntimeException e)
+                    {
+                        _logger.info("Unable to auto create a node named {} due to exception", name, e);
+                    }
+
+                }
+            }
+
+        }
+        return null;
+
+    }
+
+
     @Override
     public Queue<?> getAttainedQueue(UUID id)
     {
@@ -757,7 +938,19 @@ public abstract class AbstractVirtualHos
     public MessageDestination getAttainedMessageDestination(final String name)
     {
         MessageDestination destination = _systemNodeDestinations.get(name);
-        return destination == null ? getAttainedExchange(name) : destination;
+        if(destination == null)
+        {
+            destination = getAttainedExchange(name);
+        }
+        if(destination == null)
+        {
+            destination = getAttainedQueue(name);
+        }
+        if(destination == null)
+        {
+            destination = autoCreateDestination(name);
+        }
+        return destination;
     }
 
     @Override

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NodeAutoCreationPolicy.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NodeAutoCreationPolicy.java?rev=1720243&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NodeAutoCreationPolicy.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NodeAutoCreationPolicy.java Tue Dec 15 20:46:10 2015
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.ManagedAttributeValue;
+import org.apache.qpid.server.model.ManagedAttributeValueType;
+
+@ManagedAttributeValueType
+public interface NodeAutoCreationPolicy extends ManagedAttributeValue
+{
+    String getPattern();
+
+    boolean isCreatedOnPublish();
+
+    boolean isCreatedOnConsume();
+
+    String getNodeType();
+
+    Map<String,Object> getAttributes();
+
+}

Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NodeAutoCreationPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java?rev=1720243&r1=1720242&r2=1720243&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java Tue Dec 15 20:46:10 2015
@@ -103,6 +103,9 @@ class RedirectingVirtualHostImpl
     @ManagedAttributeField
     private List<String> _globalAddressDomains;
 
+    @ManagedAttributeField
+    private List<NodeAutoCreationPolicy> _nodeAutoCreationPolicies;
+
     @ManagedObjectFactoryConstructor
     public RedirectingVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
     {
@@ -239,6 +242,12 @@ class RedirectingVirtualHostImpl
     }
 
     @Override
+    public List<NodeAutoCreationPolicy> getNodeAutoCreationPolicies()
+    {
+        return Collections.emptyList();
+    }
+
+    @Override
     public int getConnectionThreadPoolSize()
     {
         return 0;

Modified: qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/Hello.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/Hello.java?rev=1720243&r1=1720242&r2=1720243&view=diff
==============================================================================
--- qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/Hello.java (original)
+++ qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/Hello.java Tue Dec 15 20:46:10 2015
@@ -61,7 +61,7 @@ public class Hello
             connection.start();
 
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Destination destination = (Destination) context.lookup("topicExchange");
+            Destination destination = (Destination) context.lookup("testDestination");
 
             MessageProducer messageProducer = session.createProducer(destination);
             MessageConsumer messageConsumer = session.createConsumer(destination);

Modified: qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties?rev=1720243&r1=1720242&r2=1720243&view=diff
==============================================================================
--- qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties (original)
+++ qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties Tue Dec 15 20:46:10 2015
@@ -25,3 +25,5 @@ connectionfactory.qpidConnectionfactory
 # Register an AMQP destination in JNDI
 # destination.[jniName] = [Address Format]
 destination.topicExchange = amq.topic
+destination.testDestination = rob.exchange.foo
+



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org