You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/16 23:57:08 UTC
svn commit: r1301814 - in
/qpid/branches/java-config-and-management/qpid/java:
broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/
broker/src/main/java/org/apache/qpid/server/model/
broker/src/main/java/org/apache/qpid/server/model/adap...
Author: rgodfrey
Date: Fri Mar 16 22:57:07 2012
New Revision: 1301814
URL: http://svn.apache.org/viewvc?rev=1301814&view=rev
Log:
NO-JIRA : [Java Config] added ability to create Bindings through new API, and via JMX
Modified:
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java
Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java?rev=1301814&r1=1301813&r2=1301814&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java Fri Mar 16 22:57:07 2012
@@ -30,6 +30,7 @@ import org.apache.qpid.server.model.Bind
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
import javax.management.JMException;
import javax.management.MalformedObjectNameException;
@@ -47,6 +48,7 @@ import javax.management.openmbean.Tabula
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -77,6 +79,7 @@ public class ExchangeMBean extends AMQMa
HEADERS_COMPOSITE_ITEM_DESC.toArray(new String[HEADERS_COMPOSITE_ITEM_DESC.size()]);
private static final String[] HEADERS_TABULAR_UNIQUE_INDEX_ARRAY =
HEADERS_TABULAR_UNIQUE_INDEX.toArray(new String[HEADERS_TABULAR_UNIQUE_INDEX.size()]);
+ public static final String HEADERS_EXCHANGE_TYPE = "headers";
static
{
@@ -166,7 +169,7 @@ public class ExchangeMBean extends AMQMa
public TabularData bindings() throws IOException, JMException
{
- if("headers".equals(_exchange.getExchangeType()))
+ if(HEADERS_EXCHANGE_TYPE.equals(_exchange.getExchangeType()))
{
return getHeadersBindings(_exchange.getBindings());
}
@@ -254,11 +257,44 @@ public class ExchangeMBean extends AMQMa
return bindingList;
}
- public void createNewBinding(
- @MBeanOperationParameter(name = ManagedQueue.TYPE, description = "Queue name") String queueName,
- @MBeanOperationParameter(name = "Binding", description = "New binding") String binding) throws JMException
+ public void createNewBinding(String queueName, String binding) throws JMException
{
- // TODO
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+
+ if(HEADERS_EXCHANGE_TYPE.equals(_exchange.getExchangeType()))
+ {
+ final String[] bindings = binding.split(",");
+ for (int i = 0; i < bindings.length; i++)
+ {
+ final String[] keyAndValue = bindings[i].split("=");
+ if (keyAndValue == null || keyAndValue.length == 0 || keyAndValue.length > 2 || keyAndValue[0].length() == 0)
+ {
+ throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" ");
+ }
+
+ if(keyAndValue.length == 1)
+ {
+ //no value was given, only a key. Use an empty value to signal match on key presence alone
+ arguments.put(keyAndValue[0], "");
+ }
+ else
+ {
+ arguments.put(keyAndValue[0], keyAndValue[1]);
+ }
+ }
+ }
+
+ Queue queue = null;
+ VirtualHost vhost = _exchange.getParent(VirtualHost.class);
+ for(Queue aQueue : vhost.getQueues())
+ {
+ if(aQueue.getName().equals(queueName))
+ {
+ queue = aQueue;
+ break;
+ }
+ }
+ _exchange.createBinding(binding, queue, arguments, Collections.EMPTY_MAP);
}
public void removeBinding(
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java?rev=1301814&r1=1301813&r2=1301814&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java Fri Mar 16 22:57:07 2012
@@ -21,6 +21,7 @@
package org.apache.qpid.server.model;
import java.util.Collection;
+import java.util.Map;
public interface Exchange extends ConfiguredObject
{
@@ -33,6 +34,12 @@ public interface Exchange extends Config
//children
Collection<Binding> getBindings();
Collection<Publisher> getPublishers();
+
+ //operations
+ Binding createBinding(String bindingKey,
+ Queue queue,
+ Map<String,Object> bindingArguments,
+ Map<String, Object> attributes);
// Statistics
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java?rev=1301814&r1=1301813&r2=1301814&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java Fri Mar 16 22:57:07 2012
@@ -27,12 +27,18 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Publisher;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apache.qpid.server.exchange.Exchange.BindingListener
{
@@ -48,6 +54,8 @@ final class ExchangeAdapter extends Abst
{
_vhost = virtualHostAdapter;
_exchange = exchange;
+ addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter);
+
exchange.addBindingListener(this);
populateBindings();
}
@@ -92,6 +100,46 @@ final class ExchangeAdapter extends Abst
return Collections.emptyList();
}
+ public org.apache.qpid.server.model.Binding createBinding(String bindingKey, Queue queue,
+ Map<String, Object> bindingArguments,
+ Map<String, Object> attributes)
+ throws AccessControlException, IllegalStateException
+ {
+ VirtualHost virtualHost = _vhost.getVirtualHost();
+
+
+ AMQQueue amqQueue = ((QueueAdapter)queue).getAMQQueue();
+
+ try
+ {
+ if(!virtualHost.getBindingFactory().addBinding(bindingKey, amqQueue, _exchange, bindingArguments))
+ {
+ Binding oldBinding = virtualHost.getBindingFactory().getBinding(bindingKey, amqQueue, _exchange,
+ bindingArguments);
+
+ Map<String, Object> oldArgs = oldBinding.getArguments();
+ if((oldArgs == null && !bindingArguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(bindingArguments)))
+ {
+ virtualHost.getBindingFactory().replaceBinding(bindingKey, amqQueue, _exchange, bindingArguments);
+ }
+ }
+ Binding binding = virtualHost.getBindingFactory().getBinding(bindingKey, amqQueue, _exchange, bindingArguments);
+
+ synchronized (_bindingAdapters)
+ {
+ return binding == null ? null : _bindingAdapters.get(binding);
+ }
+ }
+ catch(AMQSecurityException e)
+ {
+ throw new AccessControlException(e.toString());
+ }
+ catch(AMQInternalException e)
+ {
+ throw new IllegalStateException(e);
+ }
+ }
+
public String getName()
{
return _exchange.getName();
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java?rev=1301814&r1=1301813&r2=1301814&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java Fri Mar 16 22:57:07 2012
@@ -52,6 +52,8 @@ final class QueueAdapter extends Abstrac
public QueueAdapter(final VirtualHostAdapter virtualHostAdapter, final AMQQueue queue)
{
_vhost = virtualHostAdapter;
+ addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter);
+
_queue = queue;
_statistics = new QueueStatisticsAdapter(queue);
}
@@ -356,6 +358,11 @@ final class QueueAdapter extends Abstrac
}
}
+ AMQQueue getAMQQueue()
+ {
+ return _queue;
+ }
+
private static class QueueStatisticsAdapter implements Statistics
{
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1301814&r1=1301813&r2=1301814&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Fri Mar 16 22:57:07 2012
@@ -442,4 +442,9 @@ final class VirtualHostAdapter extends A
// TODO
throw new UnsupportedOperationException("Not Yet Implemented");
}
+
+ org.apache.qpid.server.virtualhost.VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java?rev=1301814&r1=1301813&r2=1301814&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java Fri Mar 16 22:57:07 2012
@@ -24,6 +24,7 @@ import org.apache.qpid.server.model.Bind
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Publisher;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.VirtualHost;
@@ -83,6 +84,12 @@ class ExchangeImpl extends AbstractConfi
return null; //TODO
}
+ public Binding createBinding(String bindingKey, Queue queue, Map<String, Object> bindingArguments,
+ Map<String, Object> attributes)
+ {
+ return null; // TODO - Implement
+ }
+
public State getActualState()
{
State vhostState = _virtualHost.getActualState();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org