You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/12/15 21:46:11 UTC
svn commit: r1720243 - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/virtualhost/
broker-core/src/main...
Author: rgodfrey
Date: Tue Dec 15 20:46:10 2015
New Revision: 1720243
URL: http://svn.apache.org/viewvc?rev=1720243&view=rev
Log:
QPID-6954 : Add the ability to define "policies" for node auto-creation based on address
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NodeAutoCreationPolicy.java (with props)
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/Hello.java
qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java?rev=1720243&r1=1720242&r2=1720243&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java Tue Dec 15 20:46:10 2015
@@ -103,6 +103,9 @@ public class BDBHAReplicaVirtualHostImpl
@ManagedAttributeField
private List<String> _globalAddressDomains;
+ @ManagedAttributeField
+ private List<NodeAutoCreationPolicy> _nodeAutoCreationPolicies;
+
@ManagedObjectFactoryConstructor
public BDBHAReplicaVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
{
@@ -239,6 +242,12 @@ public class BDBHAReplicaVirtualHostImpl
}
@Override
+ public List<NodeAutoCreationPolicy> getNodeAutoCreationPolicies()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
public int getConnectionThreadPoolSize()
{
return 0;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1720243&r1=1720242&r2=1720243&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Tue Dec 15 20:46:10 2015
@@ -44,6 +44,7 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.virtualhost.HouseKeepingTask;
+import org.apache.qpid.server.virtualhost.NodeAutoCreationPolicy;
import org.apache.qpid.server.virtualhost.VirtualHostConnectionListener;
@ManagedObject( defaultType = "ProvidedStore", description = VirtualHost.CLASS_DESCRIPTION)
@@ -74,6 +75,7 @@ public interface VirtualHost<X extends V
String NUMBER_OF_SELECTORS = "numberOfSelectors";
String CONNECTION_THREAD_POOL_SIZE = "connectionThreadPoolSize";
String CONNECTION_THREAD_POOL_KEEP_ALIVE_TIMEOUT = "connectionThreadPoolKeepAliveTimeout";
+ String NODE_AUTO_CREATION_POLICIES = "nodeAutoCreationPolicies";
@ManagedContextDefault( name = VIRTUALHOST_WORK_DIR_VAR)
public static final String VIRTUALHOST_WORK_DIR = VIRTUALHOST_WORK_DIR_VAR_EXPRESSION;
@@ -125,6 +127,9 @@ public interface VirtualHost<X extends V
@ManagedAttribute( defaultValue = "${virtualhost.housekeepingThreadCount}")
int getHousekeepingThreadCount();
+ @ManagedAttribute( defaultValue = "[]" )
+ List<NodeAutoCreationPolicy> getNodeAutoCreationPolicies();
+
String VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE = "virtualhost.connectionThreadPool.size";
@SuppressWarnings("unused")
@ManagedContextDefault( name = VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE)
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1720243&r1=1720242&r2=1720243&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Tue Dec 15 20:46:10 2015
@@ -42,6 +42,8 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
import javax.security.auth.Subject;
@@ -193,6 +195,9 @@ public abstract class AbstractVirtualHos
@ManagedAttributeField
private List<String> _globalAddressDomains;
+ @ManagedAttributeField
+ private List<NodeAutoCreationPolicy> _nodeAutoCreationPolicies;
+
private boolean _useAsyncRecoverer;
private MessageDestination _defaultDestination;
@@ -255,6 +260,13 @@ public abstract class AbstractVirtualHos
validateGlobalAddressDomain(domain);
}
}
+ if(getNodeAutoCreationPolicies() != null)
+ {
+ for(NodeAutoCreationPolicy policy : getNodeAutoCreationPolicies())
+ {
+ validateNodeAutoCreationPolicy(policy);
+ }
+ }
validateConnectionThreadPoolSettings(this);
}
@@ -276,6 +288,17 @@ public abstract class AbstractVirtualHos
}
}
}
+ if(changedAttributes.contains(NODE_AUTO_CREATION_POLICIES))
+ {
+ if(getNodeAutoCreationPolicies() != null)
+ {
+ for(NodeAutoCreationPolicy policy : getNodeAutoCreationPolicies())
+ {
+ validateNodeAutoCreationPolicy(policy);
+ }
+ }
+
+ }
if (changedAttributes.contains(CONNECTION_THREAD_POOL_SIZE) || changedAttributes.contains(NUMBER_OF_SELECTORS))
{
@@ -283,6 +306,58 @@ public abstract class AbstractVirtualHos
}
}
+ private void validateNodeAutoCreationPolicy(final NodeAutoCreationPolicy policy)
+ {
+ String pattern = policy.getPattern();
+ if(pattern == null)
+ {
+ throw new IllegalArgumentException("The 'pattern' attribute of a NodeAutoCreationPattern MUST be supplied");
+ }
+
+ try
+ {
+ Pattern.compile(pattern);
+ }
+ catch (PatternSyntaxException e)
+ {
+ throw new IllegalArgumentException("The 'pattern' attribute of a NodeAutoCreationPattern MUST be a valid "
+ + "Java Regular Expression Pattern, the value '" + pattern + "' is not");
+
+ }
+
+ String nodeType = policy.getNodeType();
+ Class<? extends ConfiguredObject> sourceClass = null;
+ for (Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
+ {
+ if (childClass.getSimpleName().equalsIgnoreCase(nodeType.trim()))
+ {
+ sourceClass = childClass;
+ break;
+ }
+ }
+ if(sourceClass == null)
+ {
+ throw new IllegalArgumentException("The node type of a NodeAutoCreationPattern must be a valid child type "
+ + "of a VirtualHost, '" + nodeType + "' is not.");
+ }
+ if(policy.isCreatedOnConsume() && !MessageSource.class.isAssignableFrom(sourceClass))
+ {
+ throw new IllegalArgumentException("A NodeAutoCreationPattern which creates nodes on consume must have a "
+ + "nodeType which implements MessageSource, '" + nodeType + "' does not.");
+ }
+
+ if(policy.isCreatedOnPublish() && !MessageDestination.class.isAssignableFrom(sourceClass))
+ {
+ throw new IllegalArgumentException("A NodeAutoCreationPattern which creates nodes on publish must have a "
+ + "nodeType which implements MessageDestination, '" + nodeType + "' does not.");
+ }
+ if(!(policy.isCreatedOnConsume() || policy.isCreatedOnPublish()))
+ {
+ throw new IllegalArgumentException("A NodeAutoCreationPattern must create on consume, create on publish or both.");
+ }
+
+ }
+
private void validateGlobalAddressDomain(final String name)
{
String regex = "/(/?)([\\w_\\-:.\\$]+/)*[\\w_\\-:.\\$]+";
@@ -661,6 +736,12 @@ public abstract class AbstractVirtualHos
}
@Override
+ public List<NodeAutoCreationPolicy> getNodeAutoCreationPolicies()
+ {
+ return _nodeAutoCreationPolicies;
+ }
+
+ @Override
public Queue<?> getAttainedQueue(String name)
{
Queue<?> child = awaitChildClassToAttainState(Queue.class, name);
@@ -684,10 +765,110 @@ public abstract class AbstractVirtualHos
@Override
public MessageSource getAttainedMessageSource(final String name)
{
- MessageSource systemSource = _systemNodeSources.get(name);
- return systemSource == null ? (MessageSource) awaitChildClassToAttainState(Queue.class, name) : systemSource;
+ MessageSource messageSource = _systemNodeSources.get(name);
+ if(messageSource == null)
+ {
+ messageSource = awaitChildClassToAttainState(Queue.class, name);
+ }
+ if(messageSource == null)
+ {
+ messageSource = autoCreateSource(name);
+ }
+ return messageSource;
}
+ private MessageSource autoCreateSource(final String name)
+ {
+ for(NodeAutoCreationPolicy policy : getNodeAutoCreationPolicies())
+ {
+ String pattern = policy.getPattern();
+ if(name.matches(pattern) && policy.isCreatedOnConsume())
+ {
+ String nodeType = policy.getNodeType();
+ Class<? extends ConfiguredObject> sourceClass = null;
+ for(Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
+ {
+ if(childClass.getSimpleName().equalsIgnoreCase(nodeType.trim()) && MessageSource.class.isAssignableFrom(childClass))
+ {
+ sourceClass = childClass;
+ }
+ }
+ if(sourceClass != null)
+ {
+ Map<String, Object> attributes = new HashMap<>(policy.getAttributes());
+ attributes.remove(ConfiguredObject.ID);
+ attributes.put(ConfiguredObject.NAME, name);
+
+ try
+ {
+
+ final MessageSource messageSource =
+ (MessageSource) doSync(addChildAsync(sourceClass, attributes));
+ if (messageSource != null)
+ {
+ return messageSource;
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _logger.info("Unable to auto create a node named {} due to exception", name, e);
+ }
+
+ }
+ }
+
+ }
+ return null;
+ }
+
+
+ private MessageDestination autoCreateDestination(final String name)
+ {
+ for (NodeAutoCreationPolicy policy : getNodeAutoCreationPolicies())
+ {
+ String pattern = policy.getPattern();
+ if (name.matches(pattern) && policy.isCreatedOnPublish())
+ {
+ String nodeType = policy.getNodeType();
+ Class<? extends ConfiguredObject> sourceClass = null;
+ for (Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
+ {
+ if (childClass.getSimpleName().equalsIgnoreCase(nodeType.trim())
+ && MessageDestination.class.isAssignableFrom(childClass))
+ {
+ sourceClass = childClass;
+ }
+ }
+ if (sourceClass != null)
+ {
+ Map<String, Object> attributes = new HashMap<>(policy.getAttributes());
+ attributes.remove(ConfiguredObject.ID);
+ attributes.put(ConfiguredObject.NAME, name);
+
+ try
+ {
+
+ final MessageDestination messageDestination =
+ (MessageDestination) doSync(addChildAsync(sourceClass, attributes));
+ if (messageDestination != null)
+ {
+ return messageDestination;
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _logger.info("Unable to auto create a node named {} due to exception", name, e);
+ }
+
+ }
+ }
+
+ }
+ return null;
+
+ }
+
+
@Override
public Queue<?> getAttainedQueue(UUID id)
{
@@ -757,7 +938,19 @@ public abstract class AbstractVirtualHos
public MessageDestination getAttainedMessageDestination(final String name)
{
MessageDestination destination = _systemNodeDestinations.get(name);
- return destination == null ? getAttainedExchange(name) : destination;
+ if(destination == null)
+ {
+ destination = getAttainedExchange(name);
+ }
+ if(destination == null)
+ {
+ destination = getAttainedQueue(name);
+ }
+ if(destination == null)
+ {
+ destination = autoCreateDestination(name);
+ }
+ return destination;
}
@Override
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NodeAutoCreationPolicy.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NodeAutoCreationPolicy.java?rev=1720243&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NodeAutoCreationPolicy.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NodeAutoCreationPolicy.java Tue Dec 15 20:46:10 2015
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.ManagedAttributeValue;
+import org.apache.qpid.server.model.ManagedAttributeValueType;
+
+@ManagedAttributeValueType
+public interface NodeAutoCreationPolicy extends ManagedAttributeValue
+{
+ String getPattern();
+
+ boolean isCreatedOnPublish();
+
+ boolean isCreatedOnConsume();
+
+ String getNodeType();
+
+ Map<String,Object> getAttributes();
+
+}
Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NodeAutoCreationPolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java?rev=1720243&r1=1720242&r2=1720243&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java Tue Dec 15 20:46:10 2015
@@ -103,6 +103,9 @@ class RedirectingVirtualHostImpl
@ManagedAttributeField
private List<String> _globalAddressDomains;
+ @ManagedAttributeField
+ private List<NodeAutoCreationPolicy> _nodeAutoCreationPolicies;
+
@ManagedObjectFactoryConstructor
public RedirectingVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
{
@@ -239,6 +242,12 @@ class RedirectingVirtualHostImpl
}
@Override
+ public List<NodeAutoCreationPolicy> getNodeAutoCreationPolicies()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
public int getConnectionThreadPoolSize()
{
return 0;
Modified: qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/Hello.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/Hello.java?rev=1720243&r1=1720242&r2=1720243&view=diff
==============================================================================
--- qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/Hello.java (original)
+++ qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/Hello.java Tue Dec 15 20:46:10 2015
@@ -61,7 +61,7 @@ public class Hello
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = (Destination) context.lookup("topicExchange");
+ Destination destination = (Destination) context.lookup("testDestination");
MessageProducer messageProducer = session.createProducer(destination);
MessageConsumer messageConsumer = session.createConsumer(destination);
Modified: qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties?rev=1720243&r1=1720242&r2=1720243&view=diff
==============================================================================
--- qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties (original)
+++ qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties Tue Dec 15 20:46:10 2015
@@ -25,3 +25,5 @@ connectionfactory.qpidConnectionfactory
# Register an AMQP destination in JNDI
# destination.[jniName] = [Address Format]
destination.topicExchange = amq.topic
+destination.testDestination = rob.exchange.foo
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org