You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2010/01/31 01:31:57 UTC
svn commit: r904934 [3/11] - in /qpid/trunk/qpid/java:
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/ broker/src/main/java/org/apache/qpid/qmf/ br...
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,1583 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.qmf;
+
+import org.apache.qpid.qmf.schema.BrokerSchema;
+import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class QMFService implements ConfigStore.ConfigEventListener
+{
+
+
+ private IApplicationRegistry _applicationRegistry;
+ private ConfigStore _configStore;
+
+
+ private final Map<String, QMFPackage> _supportedSchemas = new HashMap<String, QMFPackage>();
+ private static final Map<String, ConfigObjectType> _qmfClassMapping = new HashMap<String, ConfigObjectType>();
+
+ static
+ {
+ _qmfClassMapping.put("system", SystemConfigType.getInstance());
+ _qmfClassMapping.put("broker", BrokerConfigType.getInstance());
+ _qmfClassMapping.put("vhost", VirtualHostConfigType.getInstance());
+ _qmfClassMapping.put("exchange", ExchangeConfigType.getInstance());
+ _qmfClassMapping.put("queue", QueueConfigType.getInstance());
+ _qmfClassMapping.put("binding", BindingConfigType.getInstance());
+ _qmfClassMapping.put("connection", ConnectionConfigType.getInstance());
+ _qmfClassMapping.put("session", SessionConfigType.getInstance());
+ _qmfClassMapping.put("subscription", SubscriptionConfigType.getInstance());
+ _qmfClassMapping.put("link", LinkConfigType.getInstance());
+ _qmfClassMapping.put("bridge", BridgeConfigType.getInstance());
+ }
+
+ private final Map<ConfigObjectType, ConfigObjectAdapter> _adapterMap =
+ new HashMap<ConfigObjectType, ConfigObjectAdapter>();
+ private final Map<ConfigObjectType,QMFClass> _classMap =
+ new HashMap<ConfigObjectType,QMFClass>();
+
+
+ private final ConcurrentHashMap<QMFClass,ConcurrentHashMap<ConfiguredObject, QMFObject>> _managedObjects =
+ new ConcurrentHashMap<QMFClass,ConcurrentHashMap<ConfiguredObject, QMFObject>>();
+
+ private final ConcurrentHashMap<QMFClass,ConcurrentHashMap<UUID, QMFObject>> _managedObjectsById =
+ new ConcurrentHashMap<QMFClass,ConcurrentHashMap<UUID, QMFObject>>();
+
+ private static final BrokerSchema PACKAGE = BrokerSchema.getPackage();
+
+ public static interface Listener
+ {
+ public void objectCreated(QMFObject obj);
+ public void objectDeleted(QMFObject obj);
+ }
+
+ private final CopyOnWriteArrayList<Listener> _listeners = new CopyOnWriteArrayList<Listener>();
+
+ abstract class ConfigObjectAdapter<Q extends QMFObject<S,D>, S extends QMFObjectClass<Q,D>, D extends QMFObject.Delegate, T extends ConfigObjectType<T,C>, C extends ConfiguredObject<T,C>>
+ {
+ private final T _type;
+ private final S _qmfClass;
+
+
+ protected ConfigObjectAdapter(final T type, final S qmfClass)
+ {
+ _type = type;
+ _qmfClass = qmfClass;
+ _adapterMap.put(type,this);
+ _classMap.put(type,qmfClass);
+ }
+
+ public T getType()
+ {
+ return _type;
+ }
+
+ public S getQMFClass()
+ {
+ return _qmfClass;
+ }
+
+ protected final Q newInstance(D delegate)
+ {
+ return _qmfClass.newInstance(delegate);
+ }
+
+ public abstract Q createQMFObject(C configObject);
+ }
+
+ private ConfigObjectAdapter<BrokerSchema.SystemObject,
+ BrokerSchema.SystemClass,
+ BrokerSchema.SystemDelegate,
+ SystemConfigType,
+ SystemConfig> _systemObjectAdapter =
+ new ConfigObjectAdapter<BrokerSchema.SystemObject,
+ BrokerSchema.SystemClass,
+ BrokerSchema.SystemDelegate,
+ SystemConfigType,
+ SystemConfig>(SystemConfigType.getInstance(),
+ PACKAGE.getQMFClassInstance(BrokerSchema.SystemClass.class))
+ {
+
+
+ public BrokerSchema.SystemObject createQMFObject(
+ final SystemConfig configObject)
+ {
+ return newInstance(new SystemDelegate(configObject));
+ }
+ };
+
+ private ConfigObjectAdapter<BrokerSchema.BrokerObject,
+ BrokerSchema.BrokerClass,
+ BrokerSchema.BrokerDelegate,
+ BrokerConfigType,
+ BrokerConfig> _brokerObjectAdapter =
+ new ConfigObjectAdapter<BrokerSchema.BrokerObject,
+ BrokerSchema.BrokerClass,
+ BrokerSchema.BrokerDelegate,
+ BrokerConfigType,
+ BrokerConfig>(BrokerConfigType.getInstance(),
+ PACKAGE.getQMFClassInstance(BrokerSchema.BrokerClass.class))
+ {
+
+ public BrokerSchema.BrokerObject createQMFObject(
+ final BrokerConfig configObject)
+ {
+ return newInstance(new BrokerDelegate(configObject));
+ }
+ };
+
+ private ConfigObjectAdapter<BrokerSchema.VhostObject,
+ BrokerSchema.VhostClass,
+ BrokerSchema.VhostDelegate,
+ VirtualHostConfigType,
+ VirtualHostConfig> _vhostObjectAdapter =
+ new ConfigObjectAdapter<BrokerSchema.VhostObject,
+ BrokerSchema.VhostClass,
+ BrokerSchema.VhostDelegate,
+ VirtualHostConfigType,
+ VirtualHostConfig>(VirtualHostConfigType.getInstance(),
+ PACKAGE.getQMFClassInstance(BrokerSchema.VhostClass.class))
+ {
+
+ public BrokerSchema.VhostObject createQMFObject(
+ final VirtualHostConfig configObject)
+ {
+ return newInstance(new VhostDelegate(configObject));
+ }
+ };
+
+
+ private ConfigObjectAdapter<BrokerSchema.ExchangeObject,
+ BrokerSchema.ExchangeClass,
+ BrokerSchema.ExchangeDelegate,
+ ExchangeConfigType,
+ ExchangeConfig> _exchangeObjectAdapter =
+ new ConfigObjectAdapter<BrokerSchema.ExchangeObject,
+ BrokerSchema.ExchangeClass,
+ BrokerSchema.ExchangeDelegate,
+ ExchangeConfigType,
+ ExchangeConfig>(ExchangeConfigType.getInstance(),
+ PACKAGE.getQMFClassInstance(BrokerSchema.ExchangeClass.class))
+ {
+
+ public BrokerSchema.ExchangeObject createQMFObject(
+ final ExchangeConfig configObject)
+ {
+ return newInstance(new ExchangeDelegate(configObject));
+ }
+ };
+
+
+ private ConfigObjectAdapter<BrokerSchema.QueueObject,
+ BrokerSchema.QueueClass,
+ BrokerSchema.QueueDelegate,
+ QueueConfigType,
+ QueueConfig> _queueObjectAdapter =
+ new ConfigObjectAdapter<BrokerSchema.QueueObject,
+ BrokerSchema.QueueClass,
+ BrokerSchema.QueueDelegate,
+ QueueConfigType,
+ QueueConfig>(QueueConfigType.getInstance(),
+ PACKAGE.getQMFClassInstance(BrokerSchema.QueueClass.class))
+ {
+
+ public BrokerSchema.QueueObject createQMFObject(
+ final QueueConfig configObject)
+ {
+ return newInstance(new QueueDelegate(configObject));
+ }
+ };
+
+
+ private ConfigObjectAdapter<BrokerSchema.BindingObject,
+ BrokerSchema.BindingClass,
+ BrokerSchema.BindingDelegate,
+ BindingConfigType,
+ BindingConfig> _bindingObjectAdapter =
+ new ConfigObjectAdapter<BrokerSchema.BindingObject,
+ BrokerSchema.BindingClass,
+ BrokerSchema.BindingDelegate,
+ BindingConfigType,
+ BindingConfig>(BindingConfigType.getInstance(),
+ PACKAGE.getQMFClassInstance(BrokerSchema.BindingClass.class))
+ {
+
+ public BrokerSchema.BindingObject createQMFObject(
+ final BindingConfig configObject)
+ {
+ return newInstance(new BindingDelegate(configObject));
+ }
+ };
+
+
+ private ConfigObjectAdapter<BrokerSchema.ConnectionObject,
+ BrokerSchema.ConnectionClass,
+ BrokerSchema.ConnectionDelegate,
+ ConnectionConfigType,
+ ConnectionConfig> _connectionObjectAdapter =
+ new ConfigObjectAdapter<BrokerSchema.ConnectionObject,
+ BrokerSchema.ConnectionClass,
+ BrokerSchema.ConnectionDelegate,
+ ConnectionConfigType,
+ ConnectionConfig>(ConnectionConfigType.getInstance(),
+ PACKAGE.getQMFClassInstance(BrokerSchema.ConnectionClass.class))
+ {
+
+ public BrokerSchema.ConnectionObject createQMFObject(
+ final ConnectionConfig configObject)
+ {
+ return newInstance(new ConnectionDelegate(configObject));
+ }
+ };
+
+
+ private ConfigObjectAdapter<BrokerSchema.SessionObject,
+ BrokerSchema.SessionClass,
+ BrokerSchema.SessionDelegate,
+ SessionConfigType,
+ SessionConfig> _sessionObjectAdapter =
+ new ConfigObjectAdapter<BrokerSchema.SessionObject,
+ BrokerSchema.SessionClass,
+ BrokerSchema.SessionDelegate,
+ SessionConfigType,
+ SessionConfig>(SessionConfigType.getInstance(),
+ PACKAGE.getQMFClassInstance(BrokerSchema.SessionClass.class))
+ {
+
+ public BrokerSchema.SessionObject createQMFObject(
+ final SessionConfig configObject)
+ {
+ return newInstance(new SessionDelegate(configObject));
+ }
+ };
+
+ private ConfigObjectAdapter<BrokerSchema.SubscriptionObject,
+ BrokerSchema.SubscriptionClass,
+ BrokerSchema.SubscriptionDelegate,
+ SubscriptionConfigType,
+ SubscriptionConfig> _subscriptionObjectAdapter =
+ new ConfigObjectAdapter<BrokerSchema.SubscriptionObject,
+ BrokerSchema.SubscriptionClass,
+ BrokerSchema.SubscriptionDelegate,
+ SubscriptionConfigType,
+ SubscriptionConfig>(SubscriptionConfigType.getInstance(),
+ PACKAGE.getQMFClassInstance(BrokerSchema.SubscriptionClass.class))
+ {
+
+ public BrokerSchema.SubscriptionObject createQMFObject(
+ final SubscriptionConfig configObject)
+ {
+ return newInstance(new SubscriptionDelegate(configObject));
+ }
+ };
+
+
+ private ConfigObjectAdapter<BrokerSchema.LinkObject,
+ BrokerSchema.LinkClass,
+ BrokerSchema.LinkDelegate,
+ LinkConfigType,
+ LinkConfig> _linkObjectAdapter =
+ new ConfigObjectAdapter<BrokerSchema.LinkObject,
+ BrokerSchema.LinkClass,
+ BrokerSchema.LinkDelegate,
+ LinkConfigType,
+ LinkConfig>(LinkConfigType.getInstance(),
+ PACKAGE.getQMFClassInstance(BrokerSchema.LinkClass.class))
+ {
+
+ public BrokerSchema.LinkObject createQMFObject(
+ final LinkConfig configObject)
+ {
+ return newInstance(new LinkDelegate(configObject));
+ }
+ };
+
+
+ private ConfigObjectAdapter<BrokerSchema.BridgeObject,
+ BrokerSchema.BridgeClass,
+ BrokerSchema.BridgeDelegate,
+ BridgeConfigType,
+ BridgeConfig> _bridgeObjectAdapter =
+ new ConfigObjectAdapter<BrokerSchema.BridgeObject,
+ BrokerSchema.BridgeClass,
+ BrokerSchema.BridgeDelegate,
+ BridgeConfigType,
+ BridgeConfig>(BridgeConfigType.getInstance(),
+ PACKAGE.getQMFClassInstance(BrokerSchema.BridgeClass.class))
+ {
+
+ public BrokerSchema.BridgeObject createQMFObject(
+ final BridgeConfig configObject)
+ {
+ return newInstance(new BridgeDelegate(configObject));
+ }
+ };
+
+
+
+ public QMFService(ConfigStore configStore, IApplicationRegistry applicationRegistry)
+ {
+ _configStore = configStore;
+ _applicationRegistry = applicationRegistry;
+ registerSchema(PACKAGE);
+
+ for(ConfigObjectType v : _qmfClassMapping.values())
+ {
+ configStore.addConfigEventListener(v, this);
+ }
+ init();
+ }
+
+ public void close()
+ {
+ _managedObjects.clear();
+ _managedObjectsById.clear();
+ _classMap.clear();
+ _adapterMap.clear();
+ _supportedSchemas.clear();
+ }
+
+
+ public void registerSchema(QMFPackage qmfPackage)
+ {
+ _supportedSchemas.put(qmfPackage.getName(), qmfPackage);
+ }
+
+
+ public Collection<QMFPackage> getSupportedSchemas()
+ {
+ return _supportedSchemas.values();
+ }
+
+ public QMFPackage getPackage(String aPackage)
+ {
+ return _supportedSchemas.get(aPackage);
+ }
+
+ public void onEvent(final ConfiguredObject object, final ConfigStore.Event evt)
+ {
+
+ switch (evt)
+ {
+ case CREATED:
+ manageObject(object);
+ break;
+
+ case DELETED:
+ unmanageObject(object);
+ break;
+ }
+ }
+
+ public QMFObject getObjectById(QMFClass qmfclass, UUID id)
+ {
+ ConcurrentHashMap<UUID, QMFObject> map = _managedObjectsById.get(qmfclass);
+ if(map != null)
+ {
+ return map.get(id);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private void unmanageObject(final ConfiguredObject object)
+ {
+ final QMFClass qmfClass = _classMap.get(object.getConfigType());
+
+ ConcurrentHashMap<ConfiguredObject, QMFObject> classObjects = _managedObjects.get(qmfClass);
+ if(classObjects != null)
+ {
+ QMFObject qmfObject = classObjects.remove(object);
+ if(qmfObject != null)
+ {
+ _managedObjectsById.get(qmfClass).remove(object.getId());
+ objectRemoved(qmfObject);
+ }
+ }
+ }
+
+ private void manageObject(final ConfiguredObject object)
+ {
+ ConfigObjectAdapter adapter = _adapterMap.get(object.getConfigType());
+ QMFObject qmfObject = adapter.createQMFObject(object);
+ final QMFClass qmfClass = qmfObject.getQMFClass();
+ ConcurrentHashMap<ConfiguredObject, QMFObject> classObjects = _managedObjects.get(qmfClass);
+
+ if(classObjects == null)
+ {
+ classObjects = new ConcurrentHashMap<ConfiguredObject, QMFObject>();
+ if(_managedObjects.putIfAbsent(qmfClass, classObjects) != null)
+ {
+ classObjects = _managedObjects.get(qmfClass);
+ }
+ }
+
+ ConcurrentHashMap<UUID, QMFObject> classObjectsById = _managedObjectsById.get(qmfClass);
+ if(classObjectsById == null)
+ {
+ classObjectsById = new ConcurrentHashMap<UUID, QMFObject>();
+ if(_managedObjectsById.putIfAbsent(qmfClass, classObjectsById) != null)
+ {
+ classObjectsById = _managedObjectsById.get(qmfClass);
+ }
+ }
+
+ classObjectsById.put(object.getId(),qmfObject);
+
+ if(classObjects.putIfAbsent(object, qmfObject) == null);
+ {
+ objectAdded(qmfObject);
+ }
+ }
+
+ private void objectRemoved(final QMFObject qmfObject)
+ {
+ qmfObject.setDeleteTime();
+ for(Listener l : _listeners)
+ {
+ l.objectDeleted(qmfObject);
+ }
+ }
+
+ private void objectAdded(final QMFObject qmfObject)
+ {
+ for(Listener l : _listeners)
+ {
+ l.objectCreated(qmfObject);
+ }
+ }
+
+ public void addListener(Listener l)
+ {
+ _listeners.add(l);
+ }
+
+ public void removeListener(Listener l)
+ {
+ _listeners.remove(l);
+ }
+
+
+ private void init()
+ {
+ for(QMFClass qmfClass : PACKAGE.getClasses())
+ {
+ ConfigObjectType configType = getConfigType(qmfClass);
+ if(configType != null)
+ {
+ Collection<ConfiguredObject> objects = _configStore.getConfiguredObjects(configType);
+ for(ConfiguredObject object : objects)
+ {
+ manageObject(object);
+ }
+ }
+ }
+ }
+
+ public Collection<QMFObject> getObjects(QMFClass qmfClass)
+ {
+ ConcurrentHashMap<ConfiguredObject, QMFObject> classObjects = _managedObjects.get(qmfClass);
+ if(classObjects != null)
+ {
+ return classObjects.values();
+ }
+ else
+ {
+ return Collections.EMPTY_SET;
+ }
+ }
+
+ private QMFObject adapt(final ConfiguredObject object)
+ {
+ if(object == null)
+ {
+ return null;
+ }
+
+ QMFClass qmfClass = _classMap.get(object.getConfigType());
+ ConcurrentHashMap<ConfiguredObject, QMFObject> classObjects = _managedObjects.get(qmfClass);
+ if(classObjects != null)
+ {
+ QMFObject qmfObject = classObjects.get(object);
+ if(qmfObject != null)
+ {
+ return qmfObject;
+ }
+ }
+
+ return _adapterMap.get(object.getConfigType()).createQMFObject(object);
+ }
+
+ private ConfigObjectType getConfigType(final QMFClass qmfClass)
+ {
+ return _qmfClassMapping.get(qmfClass.getName());
+ }
+
+ private static class SystemDelegate implements BrokerSchema.SystemDelegate
+ {
+ private final SystemConfig _obj;
+
+ public SystemDelegate(final SystemConfig obj)
+ {
+ _obj = obj;
+ }
+
+ public UUID getSystemId()
+ {
+ return _obj.getId();
+ }
+
+ public String getOsName()
+ {
+ return _obj.getOperatingSystemName();
+ }
+
+ public String getNodeName()
+ {
+ return _obj.getNodeName();
+ }
+
+ public String getRelease()
+ {
+ return _obj.getOSRelease();
+ }
+
+ public String getVersion()
+ {
+ return _obj.getOSVersion();
+ }
+
+ public String getMachine()
+ {
+ return _obj.getOSArchitecture();
+ }
+
+ public UUID getId()
+ {
+ return _obj.getId();
+ }
+
+ public long getCreateTime()
+ {
+ return _obj.getCreateTime();
+ }
+ }
+
+ private class BrokerDelegate implements BrokerSchema.BrokerDelegate
+ {
+ private final BrokerConfig _obj;
+
+ public BrokerDelegate(final BrokerConfig obj)
+ {
+ _obj = obj;
+ }
+
+ public BrokerSchema.SystemObject getSystemRef()
+ {
+ return (BrokerSchema.SystemObject) adapt(_obj.getSystem());
+ }
+
+ public Integer getPort()
+ {
+ return _obj.getPort();
+ }
+
+ public Integer getWorkerThreads()
+ {
+ return _obj.getWorkerThreads();
+ }
+
+ public Integer getMaxConns()
+ {
+ return _obj.getMaxConnections();
+ }
+
+ public Integer getConnBacklog()
+ {
+ return _obj.getConnectionBacklogLimit();
+ }
+
+ public Long getStagingThreshold()
+ {
+ return _obj.getStagingThreshold();
+ }
+
+ public Integer getMgmtPubInterval()
+ {
+ return _obj.getManagementPublishInterval();
+ }
+
+ public String getVersion()
+ {
+ return _obj.getVersion();
+ }
+
+ public String getDataDir()
+ {
+ return _obj.getDataDirectory();
+ }
+
+ public Long getUptime()
+ {
+ return (System.currentTimeMillis() - _obj.getCreateTime()) * 1000000L;
+ }
+
+ public BrokerSchema.BrokerClass.EchoMethodResponseCommand echo(final BrokerSchema.BrokerClass.EchoMethodResponseCommandFactory factory,
+ final Long sequence,
+ final String body)
+ {
+ return factory.createResponseCommand(sequence, body);
+ }
+
+ public BrokerSchema.BrokerClass.ConnectMethodResponseCommand connect(final BrokerSchema.BrokerClass.ConnectMethodResponseCommandFactory factory,
+ final String host,
+ final Long port,
+ final Boolean durable,
+ final String authMechanism,
+ final String username,
+ final String password,
+ final String transport)
+ {
+ _obj.createBrokerConnection(transport, host, port.intValue(), durable, authMechanism, username, password);
+
+ return factory.createResponseCommand();
+ }
+
+ public BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommand queueMoveMessages(final BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommandFactory factory,
+ final String srcQueue,
+ final String destQueue,
+ final Long qty)
+ {
+ // todo
+ throw new UnsupportedOperationException();
+ }
+
+ public UUID getId()
+ {
+ return _obj.getId();
+ }
+
+ public long getCreateTime()
+ {
+ return _obj.getCreateTime();
+ }
+ }
+
+ private class VhostDelegate implements BrokerSchema.VhostDelegate
+ {
+ private final VirtualHostConfig _obj;
+
+ public VhostDelegate(final VirtualHostConfig obj)
+ {
+ _obj = obj;
+ }
+
+ public BrokerSchema.BrokerObject getBrokerRef()
+ {
+ return (BrokerSchema.BrokerObject) adapt(_obj.getBroker());
+ }
+
+ public String getName()
+ {
+ return _obj.getName();
+ }
+
+ public String getFederationTag()
+ {
+ return _obj.getFederationTag();
+ }
+
+ public UUID getId()
+ {
+ return _obj.getId();
+ }
+
+ public long getCreateTime()
+ {
+ return _obj.getCreateTime();
+ }
+ }
+
+ private class ExchangeDelegate implements BrokerSchema.ExchangeDelegate
+ {
+ private final ExchangeConfig _obj;
+
+ public ExchangeDelegate(final ExchangeConfig obj)
+ {
+ _obj = obj;
+ }
+
+ public BrokerSchema.VhostObject getVhostRef()
+ {
+ return (BrokerSchema.VhostObject) adapt(_obj.getVirtualHost());
+ }
+
+ public String getName()
+ {
+ return _obj.getName();
+ }
+
+ public String getType()
+ {
+ return _obj.getType().getName().toString();
+ }
+
+ public Boolean getDurable()
+ {
+ return _obj.isDurable();
+ }
+
+ public Boolean getAutoDelete()
+ {
+ return _obj.isAutoDelete();
+ }
+
+ public BrokerSchema.ExchangeObject getAltExchange()
+ {
+ if(_obj.getAlternateExchange() != null)
+ {
+ return (BrokerSchema.ExchangeObject) adapt(_obj.getAlternateExchange());
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public Map getArguments()
+ {
+ return _obj.getArguments();
+ }
+
+ public Long getProducerCount()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getProducerCountHigh()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getProducerCountLow()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getBindingCount()
+ {
+ return _obj.getBindingCount();
+ }
+
+ public Long getBindingCountHigh()
+ {
+ return _obj.getBindingCountHigh();
+ }
+
+ public Long getBindingCountLow()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getMsgReceives()
+ {
+ return _obj.getMsgReceives();
+ }
+
+ public Long getMsgDrops()
+ {
+ return getMsgReceives() - getMsgRoutes();
+ }
+
+ public Long getMsgRoutes()
+ {
+ return _obj.getMsgRoutes();
+ }
+
+ public Long getByteReceives()
+ {
+ return _obj.getByteReceives();
+ }
+
+ public Long getByteDrops()
+ {
+ return getByteReceives() - getByteRoutes();
+ }
+
+ public Long getByteRoutes()
+ {
+ return _obj.getByteRoutes();
+ }
+
+ public UUID getId()
+ {
+ return _obj.getId();
+ }
+
+ public long getCreateTime()
+ {
+ return _obj.getCreateTime();
+ }
+ }
+
+ private class QueueDelegate implements BrokerSchema.QueueDelegate
+ {
+ private final QueueConfig _obj;
+
+ public QueueDelegate(final QueueConfig obj)
+ {
+ _obj = obj;
+ }
+
+ public BrokerSchema.VhostObject getVhostRef()
+ {
+ return (BrokerSchema.VhostObject) adapt(_obj.getVirtualHost());
+ }
+
+ public String getName()
+ {
+ return _obj.getName();
+ }
+
+ public Boolean getDurable()
+ {
+ return _obj.isDurable();
+ }
+
+ public Boolean getAutoDelete()
+ {
+ return _obj.isAutoDelete();
+ }
+
+ public Boolean getExclusive()
+ {
+ return _obj.isExclusive();
+ }
+
+ public BrokerSchema.ExchangeObject getAltExchange()
+ {
+ if(_obj.getAlternateExchange() != null)
+ {
+ return (BrokerSchema.ExchangeObject) adapt(_obj.getAlternateExchange());
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public Long getMsgTotalEnqueues()
+ {
+ return _obj.getReceivedMessageCount();
+ }
+
+ public Long getMsgTotalDequeues()
+ {
+ return _obj.getMessageDequeueCount();
+ }
+
+ public Long getMsgTxnEnqueues()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getMsgTxnDequeues()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getMsgPersistEnqueues()
+ {
+ return _obj.getPersistentMsgEnqueues();
+ }
+
+ public Long getMsgPersistDequeues()
+ {
+ return _obj.getPersistentMsgDequeues();
+ }
+
+ public Long getMsgDepth()
+ {
+ return (long) _obj.getMessageCount();
+ }
+
+ public Long getByteDepth()
+ {
+ return _obj.getQueueDepth();
+ }
+
+ public Long getByteTotalEnqueues()
+ {
+ return _obj.getTotalEnqueueSize();
+ }
+
+ public Long getByteTotalDequeues()
+ {
+ return _obj.getTotalDequeueSize();
+ }
+
+ public Long getByteTxnEnqueues()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getByteTxnDequeues()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getBytePersistEnqueues()
+ {
+ return _obj.getPersistentByteEnqueues();
+ }
+
+ public Long getBytePersistDequeues()
+ {
+ return _obj.getPersistentByteDequeues();
+ }
+
+ public Long getConsumerCount()
+ {
+ return (long) _obj.getConsumerCount();
+ }
+
+ public Long getConsumerCountHigh()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getConsumerCountLow()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getBindingCount()
+ {
+ return (long) _obj.getBindingCount();
+ }
+
+ public Long getBindingCountHigh()
+ {
+ return (long) _obj.getBindingCountHigh();
+ }
+
+ public Long getBindingCountLow()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getUnackedMessages()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getUnackedMessagesHigh()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getUnackedMessagesLow()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getMessageLatencySamples()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getMessageLatencyMin()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getMessageLatencyMax()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getMessageLatencyAverage()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public BrokerSchema.QueueClass.PurgeMethodResponseCommand purge(final BrokerSchema.QueueClass.PurgeMethodResponseCommandFactory factory,
+ final Long request)
+ {
+ _obj.purge(request);
+ return factory.createResponseCommand();
+ }
+
+ public Map getArguments()
+ {
+ return _obj.getArguments();
+ }
+
+ public UUID getId()
+ {
+ return _obj.getId();
+ }
+
+ public long getCreateTime()
+ {
+ return _obj.getCreateTime();
+ }
+ }
+
+ private class BindingDelegate implements BrokerSchema.BindingDelegate
+ {
+ private final BindingConfig _obj;
+
+ public BindingDelegate(final BindingConfig obj)
+ {
+ _obj = obj;
+ }
+
+ public BrokerSchema.ExchangeObject getExchangeRef()
+ {
+ return (BrokerSchema.ExchangeObject) adapt(_obj.getExchange());
+ }
+
+ public BrokerSchema.QueueObject getQueueRef()
+ {
+ return (BrokerSchema.QueueObject) adapt(_obj.getQueue());
+ }
+
+ public String getBindingKey()
+ {
+ return _obj.getBindingKey();
+ }
+
+ public Map getArguments()
+ {
+ return _obj.getArguments();
+ }
+
+ public String getOrigin()
+ {
+ return _obj.getOrigin();
+ }
+
+ public Long getMsgMatched()
+ {
+ // TODO
+ return _obj.getMatches();
+ }
+
+ public UUID getId()
+ {
+ return _obj.getId();
+ }
+
+ public long getCreateTime()
+ {
+ return _obj.getCreateTime();
+ }
+ }
+
+ private class ConnectionDelegate implements BrokerSchema.ConnectionDelegate
+ {
+ private final ConnectionConfig _obj;
+
+ public ConnectionDelegate(final ConnectionConfig obj)
+ {
+ _obj = obj;
+ }
+
+ public BrokerSchema.VhostObject getVhostRef()
+ {
+ return (BrokerSchema.VhostObject) adapt(_obj.getVirtualHost());
+ }
+
+ public String getAddress()
+ {
+ return _obj.getAddress();
+ }
+
+ public Boolean getIncoming()
+ {
+ return _obj.isIncoming();
+ }
+
+ public Boolean getSystemConnection()
+ {
+ return _obj.isSystemConnection();
+ }
+
+ public Boolean getFederationLink()
+ {
+ return _obj.isFederationLink();
+ }
+
+ public String getAuthIdentity()
+ {
+ return _obj.getAuthId();
+ }
+
+ public String getRemoteProcessName()
+ {
+ return _obj.getRemoteProcessName();
+ }
+
+ public Long getRemotePid()
+ {
+ Integer remotePID = _obj.getRemotePID();
+ return remotePID == null ? null : (long) remotePID;
+ }
+
+ public Long getRemoteParentPid()
+ {
+ Integer remotePPID = _obj.getRemoteParentPID();
+ return remotePPID == null ? null : (long) remotePPID;
+
+ }
+
+ public Boolean getClosing()
+ {
+ return false;
+ }
+
+ public Long getFramesFromClient()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getFramesToClient()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getBytesFromClient()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getBytesToClient()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public BrokerSchema.ConnectionClass.CloseMethodResponseCommand close(final BrokerSchema.ConnectionClass.CloseMethodResponseCommandFactory factory)
+ {
+ //todo
+ throw new UnsupportedOperationException();
+ }
+
+ public UUID getId()
+ {
+ return _obj.getId();
+ }
+
+ public long getCreateTime()
+ {
+ // TODO
+ return 0;
+ }
+ }
+
+ private class SessionDelegate implements BrokerSchema.SessionDelegate
+ {
+ private final SessionConfig _obj;
+
+ public SessionDelegate(final SessionConfig obj)
+ {
+ _obj = obj;
+ }
+
+ public BrokerSchema.VhostObject getVhostRef()
+ {
+ return (BrokerSchema.VhostObject) adapt(_obj.getVirtualHost());
+ }
+
+ public String getName()
+ {
+ return _obj.getSessionName();
+ }
+
+ public Integer getChannelId()
+ {
+ return _obj.getChannel();
+ }
+
+ public BrokerSchema.ConnectionObject getConnectionRef()
+ {
+ return (BrokerSchema.ConnectionObject) adapt(_obj.getConnectionConfig());
+ }
+
+ public Long getDetachedLifespan()
+ {
+ return _obj.getDetachedLifespan();
+ }
+
+ public Boolean getAttached()
+ {
+ return _obj.isAttached();
+ }
+
+ public Long getExpireTime()
+ {
+ return _obj.getExpiryTime();
+ }
+
+ public Long getMaxClientRate()
+ {
+ return _obj.getMaxClientRate();
+ }
+
+ public Long getFramesOutstanding()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getTxnStarts()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getTxnCommits()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getTxnRejects()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getTxnCount()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getClientCredit()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public BrokerSchema.SessionClass.SolicitAckMethodResponseCommand solicitAck(final BrokerSchema.SessionClass.SolicitAckMethodResponseCommandFactory factory)
+ {
+ //todo
+ throw new UnsupportedOperationException();
+ }
+
+ public BrokerSchema.SessionClass.DetachMethodResponseCommand detach(final BrokerSchema.SessionClass.DetachMethodResponseCommandFactory factory)
+ {
+ //todo
+ throw new UnsupportedOperationException();
+ }
+
+ public BrokerSchema.SessionClass.ResetLifespanMethodResponseCommand resetLifespan(final BrokerSchema.SessionClass.ResetLifespanMethodResponseCommandFactory factory)
+ {
+ //todo
+ throw new UnsupportedOperationException();
+ }
+
+ public BrokerSchema.SessionClass.CloseMethodResponseCommand close(final BrokerSchema.SessionClass.CloseMethodResponseCommandFactory factory)
+ {
+ //todo
+ throw new UnsupportedOperationException();
+ }
+
+ public UUID getId()
+ {
+ return _obj.getId();
+ }
+
+ public long getCreateTime()
+ {
+ // TODO
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+ }
+
+ private class SubscriptionDelegate implements BrokerSchema.SubscriptionDelegate
+ {
+ private final SubscriptionConfig _obj;
+
+ private SubscriptionDelegate(final SubscriptionConfig obj)
+ {
+ _obj = obj;
+ }
+
+
+ public BrokerSchema.SessionObject getSessionRef()
+ {
+ return (BrokerSchema.SessionObject) adapt(_obj.getSessionConfig());
+ }
+
+ public BrokerSchema.QueueObject getQueueRef()
+ {
+ return (BrokerSchema.QueueObject) adapt(_obj.getQueue());
+ }
+
+ public String getName()
+ {
+ return _obj.getName();
+ }
+
+ public Boolean getBrowsing()
+ {
+ return _obj.isBrowsing();
+ }
+
+ public Boolean getAcknowledged()
+ {
+ return _obj.isExplicitAcknowledge();
+ }
+
+ public Boolean getExclusive()
+ {
+ return _obj.isExclusive();
+ }
+
+ public String getCreditMode()
+ {
+ return _obj.getCreditMode();
+ }
+
+ public Map getArguments()
+ {
+ return _obj.getArguments();
+ }
+
+ public Long getDelivered()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public UUID getId()
+ {
+ return _obj.getId();
+ }
+
+ public long getCreateTime()
+ {
+ // TODO
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+ }
+
+ private class BridgeDelegate implements BrokerSchema.BridgeDelegate
+ {
+ private final BridgeConfig _obj;
+
+ private BridgeDelegate(final BridgeConfig obj)
+ {
+ _obj = obj;
+ }
+
+ public BrokerSchema.LinkObject getLinkRef()
+ {
+ return (BrokerSchema.LinkObject) adapt(_obj.getLink());
+ }
+
+ public Integer getChannelId()
+ {
+ return _obj.getChannelId();
+ }
+
+ public Boolean getDurable()
+ {
+ return _obj.isDurable();
+ }
+
+ public String getSrc()
+ {
+ return _obj.getSource();
+ }
+
+ public String getDest()
+ {
+ return _obj.getDestination();
+ }
+
+ public String getKey()
+ {
+ return _obj.getKey();
+ }
+
+ public Boolean getSrcIsQueue()
+ {
+ return _obj.isQueueBridge();
+ }
+
+ public Boolean getSrcIsLocal()
+ {
+ return _obj.isLocalSource();
+ }
+
+ public String getTag()
+ {
+ return _obj.getTag();
+ }
+
+ public String getExcludes()
+ {
+ return _obj.getExcludes();
+ }
+
+ public Boolean getDynamic()
+ {
+ return _obj.isDynamic();
+ }
+
+ public Integer getSync()
+ {
+ return _obj.getAckBatching();
+ }
+
+ public BrokerSchema.BridgeClass.CloseMethodResponseCommand close(final BrokerSchema.BridgeClass.CloseMethodResponseCommandFactory factory)
+ {
+ return null;
+ }
+
+ public UUID getId()
+ {
+ return _obj.getId();
+ }
+
+ public long getCreateTime()
+ {
+ return _obj.getCreateTime();
+ }
+ }
+
+ private class LinkDelegate implements BrokerSchema.LinkDelegate
+ {
+ private final LinkConfig _obj;
+
+ private LinkDelegate(final LinkConfig obj)
+ {
+ _obj = obj;
+ }
+
+ public BrokerSchema.VhostObject getVhostRef()
+ {
+ return (BrokerSchema.VhostObject) adapt(_obj.getVirtualHost());
+ }
+
+ public String getHost()
+ {
+ return _obj.getHost();
+ }
+
+ public Integer getPort()
+ {
+ return _obj.getPort();
+ }
+
+ public String getTransport()
+ {
+ return _obj.getTransport();
+ }
+
+ public Boolean getDurable()
+ {
+ return _obj.isDurable();
+ }
+
+ public String getState()
+ {
+ // TODO
+ return "";
+ }
+
+ public String getLastError()
+ {
+ // TODO
+ return "";
+ }
+
+ public BrokerSchema.LinkClass.CloseMethodResponseCommand close(final BrokerSchema.LinkClass.CloseMethodResponseCommandFactory factory)
+ {
+ _obj.close();
+ return factory.createResponseCommand();
+ }
+
+ public BrokerSchema.LinkClass.BridgeMethodResponseCommand bridge(final BrokerSchema.LinkClass.BridgeMethodResponseCommandFactory factory,
+ final Boolean durable,
+ final String src,
+ final String dest,
+ final String key,
+ final String tag,
+ final String excludes,
+ final Boolean srcIsQueue,
+ final Boolean srcIsLocal,
+ final Boolean dynamic,
+ final Integer sync)
+ {
+ _obj.createBridge(durable, dynamic, srcIsQueue, srcIsLocal, src, dest, key, tag, excludes);
+ return factory.createResponseCommand();
+ }
+
+ public UUID getId()
+ {
+ return _obj.getId();
+ }
+
+ public long getCreateTime()
+ {
+ return _obj.getCreateTime();
+ }
+ }
+
+}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFStatistic.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFStatistic.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFStatistic.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFStatistic.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.qmf;
+
+import org.apache.qpid.transport.codec.Encoder;
+
+import java.util.LinkedHashMap;
+
+public class QMFStatistic
+{
+ private final LinkedHashMap<String,Object> _map = new LinkedHashMap<String,Object>();
+ private static final String NAME = "name";
+ private static final String TYPE = "type";
+ private static final String UNIT = "unit";
+ private static final String DESCRIPTION = "desc";
+
+
+ public QMFStatistic(String name, QMFType type, String unit, String description)
+ {
+ _map.put(NAME, name);
+ _map.put(TYPE, type.codeValue());
+ if(unit != null)
+ {
+ _map.put(UNIT, unit);
+ }
+ if(description != null)
+ {
+ _map.put(DESCRIPTION, description);
+ }
+
+ }
+
+ public void encode(Encoder encoder)
+ {
+ encoder.writeMap(_map);
+ }
+
+ public String getName()
+ {
+ return (String) _map.get(NAME);
+ }
+}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFType.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFType.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFType.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf;
+
+public enum QMFType
+{
+
+ UINT8,
+ UINT16,
+ UINT32,
+ UINT64,
+ UNKNOWN,
+ STR8,
+ STR16,
+ ABSTIME,
+ DELTATIME,
+ OBJECTREFERENCE,
+ BOOLEAN,
+ FLOAT,
+ DOUBLE,
+ UUID,
+ MAP,
+ INT8,
+ INT16,
+ INT32,
+ INT64,
+ OBJECT,
+ LIST,
+ ARRAY;
+
+ public int codeValue()
+ {
+ return ordinal()+1;
+ }
+}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Sun Jan 31 00:31:49 2010
@@ -20,45 +20,72 @@
*/
package org.apache.qpid.server;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.ConnectionConfig;
+import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.configuration.SessionConfigType;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.*;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.txn.*;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.messages.ChannelMessages;
-import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.logging.actors.AMQPChannelActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.RecordDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
-public class AMQChannel
+public class AMQChannel implements SessionConfig
{
public static final int DEFAULT_PREFETCH = 5000;
@@ -123,6 +150,7 @@
private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
private static final
AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
+ private final UUID _id;
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
@@ -132,15 +160,22 @@
_actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
_logSubject = new ChannelLogSubject(this);
-
+ _id = getConfigStore().createId();
_actor.message(ChannelMessages.CHN_CREATE());
+ getConfigStore().addConfiguredObject(this);
+
_messageStore = messageStore;
// by default the session is non-transactional
_transaction = new AutoCommitTransaction(_messageStore);
}
+ public ConfigStore getConfigStore()
+ {
+ return getVirtualHost().getConfigStore();
+ }
+
/** Sets this channel to be part of a local transaction */
public void setLocalTransactional()
{
@@ -220,7 +255,7 @@
try
{
- final ArrayList<AMQQueue> destinationQueues = _currentMessage.getDestinationQueues();
+ final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
if(!checkMessageUserId(_currentMessage.getContentHeader()))
{
@@ -411,6 +446,8 @@
_logger.error("Caught AMQException whilst attempting to reque:" + e);
}
+ getConfigStore().removeConfiguredObject(this);
+
}
private void setClosing(boolean closing)
@@ -970,10 +1007,10 @@
private class MessageDeliveryAction implements ServerTransaction.Action
{
private IncomingMessage _incommingMessage;
- private ArrayList<AMQQueue> _destinationQueues;
+ private ArrayList<? extends BaseQueue> _destinationQueues;
public MessageDeliveryAction(IncomingMessage currentMessage,
- ArrayList<AMQQueue> destinationQueues)
+ ArrayList<? extends BaseQueue> destinationQueues)
{
_incommingMessage = currentMessage;
_destinationQueues = destinationQueues;
@@ -988,53 +1025,24 @@
final AMQMessage amqMessage = createAMQMessage(_incommingMessage);
MessageReference ref = amqMessage.newReference();
- for(AMQQueue queue : _destinationQueues)
+ for(final BaseQueue queue : _destinationQueues)
{
+ BaseQueue.PostEnqueueAction action;
- QueueEntry entry = queue.enqueue(amqMessage);
- queue.checkCapacity(AMQChannel.this);
-
-
- if(immediate && !entry.getDeliveredToConsumer() && entry.acquire())
+ if(immediate)
{
+ action = new ImmediateAction(queue);
+ }
+ else
+ {
+ action = null;
+ }
+ queue.enqueue(amqMessage, action);
- ServerTransaction txn = new LocalTransaction(_messageStore);
- Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
- entries.add(entry);
- final AMQMessage message = (AMQMessage) entry.getMessage();
- txn.dequeue(queue, entry.getMessage(),
- new MessageAcknowledgeAction(entries)
- {
- @Override
- public void postCommit()
- {
- try
- {
- final
- ProtocolOutputConverter outputConverter =
- _session.getProtocolOutputConverter();
-
- outputConverter.writeReturn(message.getMessagePublishInfo(),
- message.getContentHeaderBody(),
- message,
- _channelId,
- AMQConstant.NO_CONSUMERS.getCode(),
- IMMEDIATE_DELIVERY_REPLY_TEXT);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- super.postCommit();
- }
- }
- );
- txn.commit();
-
-
-
-
+ if(queue instanceof AMQQueue)
+ {
+ ((AMQQueue)queue).checkCapacity(AMQChannel.this);
}
}
@@ -1057,6 +1065,60 @@
// Maybe keep track of entries that were created and then delete them here in case of failure
// to in memory enqueue
}
+
+ private class ImmediateAction implements BaseQueue.PostEnqueueAction
+ {
+ private final BaseQueue _queue;
+
+ public ImmediateAction(BaseQueue queue)
+ {
+ _queue = queue;
+ }
+
+ public void onEnqueue(QueueEntry entry)
+ {
+ if (!entry.getDeliveredToConsumer() && entry.acquire())
+ {
+
+
+ ServerTransaction txn = new LocalTransaction(_messageStore);
+ Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
+ entries.add(entry);
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ txn.dequeue(_queue, entry.getMessage(),
+ new MessageAcknowledgeAction(entries)
+ {
+ @Override
+ public void postCommit()
+ {
+ try
+ {
+ final
+ ProtocolOutputConverter outputConverter =
+ _session.getProtocolOutputConverter();
+
+ outputConverter.writeReturn(message.getMessagePublishInfo(),
+ message.getContentHeaderBody(),
+ message,
+ _channelId,
+ AMQConstant.NO_CONSUMERS.getCode(),
+ IMMEDIATE_DELIVERY_REPLY_TEXT);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ super.postCommit();
+ }
+ }
+ );
+ txn.commit();
+
+
+ }
+
+ }
+ }
}
private class MessageAcknowledgeAction implements ServerTransaction.Action
@@ -1163,7 +1225,7 @@
if(_blocking.compareAndSet(false,true))
{
- _actor.message(_logSubject, ChannelMessages.CHN_FLOW_ENFORCED(queue.getName().toString()));
+ _actor.message(_logSubject, ChannelMessages.CHN_FLOW_ENFORCED(queue.getNameShortString().toString()));
flow(false);
}
}
@@ -1188,9 +1250,70 @@
AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow);
_session.writeFrame(responseBody.generateFrame(_channelId));
}
-
+
public boolean getBlocking()
{
return _blocking.get();
}
+
+ public VirtualHost getVirtualHost()
+ {
+ return getProtocolSession().getVirtualHost();
+ }
+
+
+ public ConfiguredObject getParent()
+ {
+ return getVirtualHost();
+ }
+
+ public SessionConfigType getConfigType()
+ {
+ return SessionConfigType.getInstance();
+ }
+
+ public int getChannel()
+ {
+ return getChannelId();
+ }
+
+ public boolean isAttached()
+ {
+ return true;
+ }
+
+ public long getDetachedLifespan()
+ {
+ return 0;
+ }
+
+ public ConnectionConfig getConnectionConfig()
+ {
+ return (AMQProtocolEngine)getProtocolSession();
+ }
+
+ public Long getExpiryTime()
+ {
+ return null;
+ }
+
+ public Long getMaxClientRate()
+ {
+ return null;
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public String getSessionName()
+ {
+ return getConnectionConfig().getAddress() + "/" + getChannelId();
+ }
}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.binding;
+
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Binding
+{
+ private final String _bindingKey;
+ private final AMQQueue _queue;
+ private final Exchange _exchange;
+ private final Map<String, Object> _arguments;
+ private final UUID _id;
+ private final AtomicLong _matches = new AtomicLong();
+
+ Binding(UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
+ {
+ _id = id;
+ _bindingKey = bindingKey;
+ _queue = queue;
+ _exchange = exchange;
+ _arguments = arguments == null ? Collections.EMPTY_MAP : Collections.unmodifiableMap(arguments);
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public String getBindingKey()
+ {
+ return _bindingKey;
+ }
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public Exchange getExchange()
+ {
+ return _exchange;
+ }
+
+ public Map<String, Object> getArguments()
+ {
+ return _arguments;
+ }
+
+ public void incrementMatches()
+ {
+ _matches.incrementAndGet();
+ }
+
+ public long getMatches()
+ {
+ return _matches.get();
+ }
+
+ boolean isDurable()
+ {
+ return _queue.isDurable() && _exchange.isDurable();
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final Binding binding = (Binding) o;
+
+ if (!_bindingKey.equals(binding._bindingKey)) return false;
+ if (!_exchange.equals(binding._exchange)) return false;
+ if (!_queue.equals(binding._queue)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _bindingKey.hashCode();
+ result = 31 * result + _queue.hashCode();
+ result = 31 * result + _exchange.hashCode();
+ return result;
+ }
+
+
+
+
+
+}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,290 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.binding;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.configuration.BindingConfig;
+import org.apache.qpid.server.configuration.BindingConfigType;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.BindingMessages;
+import org.apache.qpid.server.logging.subjects.BindingLogSubject;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class BindingFactory
+{
+
+ private final VirtualHost _virtualHost;
+ private final DurableConfigurationStore.Source _configSource;
+ private final Exchange _defaultExchange;
+
+ private final ConcurrentHashMap<BindingImpl, BindingImpl> _bindings = new ConcurrentHashMap<BindingImpl, BindingImpl>();
+
+
+ public BindingFactory(final VirtualHost vhost)
+ {
+ this(vhost,vhost.getExchangeRegistry().getDefaultExchange());
+ }
+
+ public BindingFactory(final DurableConfigurationStore.Source configSource, final Exchange defaultExchange)
+ {
+ _configSource = configSource;
+ _defaultExchange = defaultExchange;
+ if(configSource instanceof VirtualHost)
+ {
+ _virtualHost = (VirtualHost) configSource;
+ }
+ else
+ {
+ _virtualHost = null;
+ }
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+
+
+ private final class BindingImpl extends Binding implements AMQQueue.Task, Exchange.Task, BindingConfig
+ {
+ private final BindingLogSubject _logSubject;
+ //TODO
+ private long _createTime = System.currentTimeMillis();
+
+ private BindingImpl(String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
+ {
+ super(queue.getVirtualHost().getConfigStore().createId(),bindingKey, queue, exchange, arguments);
+ _logSubject = new BindingLogSubject(bindingKey,exchange,queue);
+
+ }
+
+
+ public void doTask(final AMQQueue queue) throws AMQException
+ {
+ removeBinding(this);
+ }
+
+ public void onClose(final Exchange exchange)
+ {
+ removeBinding(this);
+ }
+
+ void logCreation()
+ {
+ CurrentActor.get().message(_logSubject, BindingMessages.BND_CREATED(String.valueOf(getArguments()), getArguments() != null && !getArguments().isEmpty()));
+ }
+
+ void logDestruction()
+ {
+ CurrentActor.get().message(_logSubject, BindingMessages.BND_DELETED());
+ }
+
+ public String getOrigin()
+ {
+ return (String) getArguments().get("qpid.fed.origin");
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+ public BindingConfigType getConfigType()
+ {
+ return BindingConfigType.getInstance();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return _virtualHost;
+ }
+
+ public boolean isDurable()
+ {
+ return getQueue().isDurable() && getExchange().isDurable();
+ }
+
+ }
+
+
+
+ public boolean addBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments)
+ {
+ return makeBinding(bindingKey, queue, exchange, arguments, false, false);
+ }
+
+
+ public boolean replaceBinding(final String bindingKey,
+ final AMQQueue queue,
+ final Exchange exchange,
+ final Map<String, Object> arguments)
+ {
+ return makeBinding(bindingKey, queue, exchange, arguments, false, true);
+ }
+
+ private boolean makeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force)
+ {
+ assert queue != null;
+ if(bindingKey == null)
+ {
+ bindingKey = "";
+ }
+ if(exchange == null)
+ {
+ exchange = _defaultExchange;
+ }
+ if(arguments == null)
+ {
+ arguments = Collections.EMPTY_MAP;
+ }
+
+ BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments);
+ BindingImpl existingMapping = _bindings.putIfAbsent(b,b);
+ if(existingMapping == null || force)
+ {
+ if(existingMapping != null)
+ {
+ removeBinding(existingMapping);
+ }
+
+ if(b.isDurable() && !restore)
+ {
+ try
+ {
+ _configSource.getDurableConfigurationStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments));
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ queue.addQueueDeleteTask(b);
+ exchange.addCloseTask(b);
+ queue.addBinding(b);
+ exchange.addBinding(b);
+ getConfigStore().addConfiguredObject(b);
+ b.logCreation();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+
+
+ }
+
+ private ConfigStore getConfigStore()
+ {
+ return getVirtualHost().getConfigStore();
+ }
+
+ public void restoreBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap)
+ {
+ makeBinding(bindingKey,queue,exchange,argumentMap,true, false);
+ }
+
+ public void removeBinding(final Binding b)
+ {
+ removeBinding(b.getBindingKey(), b.getQueue(), b.getExchange(), b.getArguments());
+ }
+
+
+ public Binding removeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments)
+ {
+ assert queue != null;
+ if(bindingKey == null)
+ {
+ bindingKey = "";
+ }
+ if(exchange == null)
+ {
+ exchange = _defaultExchange;
+ }
+ if(arguments == null)
+ {
+ arguments = Collections.EMPTY_MAP;
+ }
+
+ BindingImpl b = _bindings.remove(new BindingImpl(bindingKey,queue,exchange,arguments));
+
+ if(b != null)
+ {
+ exchange.removeBinding(b);
+ queue.removeBinding(b);
+ exchange.removeCloseTask(b);
+ queue.removeQueueDeleteTask(b);
+
+ if(b.isDurable())
+ {
+ try
+ {
+ _configSource.getDurableConfigurationStore().unbindQueue(exchange,
+ new AMQShortString(bindingKey),
+ queue,
+ FieldTable.convertToFieldTable(arguments));
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ b.logDestruction();
+ getConfigStore().removeConfiguredObject(b);
+
+ }
+
+ return b;
+ }
+
+ public Binding getBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments)
+ {
+ assert queue != null;
+ if(bindingKey == null)
+ {
+ bindingKey = "";
+ }
+ if(exchange == null)
+ {
+ exchange = _defaultExchange;
+ }
+ if(arguments == null)
+ {
+ arguments = Collections.EMPTY_MAP;
+ }
+
+ BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments);
+ return _bindings.get(b);
+ }
+}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BindingConfig.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BindingConfig.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BindingConfig.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BindingConfig.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.configuration;
+
+import java.util.Map;
+
+
+public interface BindingConfig extends ConfiguredObject<BindingConfigType, BindingConfig>
+{
+
+ ExchangeConfig getExchange();
+
+ QueueConfig getQueue();
+
+ String getBindingKey();
+
+ Map<String, Object> getArguments();
+
+ String getOrigin();
+
+ long getCreateTime();
+
+ long getMatches();
+}
\ No newline at end of file
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BindingConfigType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BindingConfigType.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BindingConfigType.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BindingConfigType.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.configuration;
+
+import org.apache.qpid.server.exchange.ExchangeType;
+
+import java.util.*;
+
+public final class BindingConfigType extends ConfigObjectType<BindingConfigType, BindingConfig>
+{
+ private static final List<BindingProperty<?>> BINDING_PROPERTIES = new ArrayList<BindingProperty<?>>();
+
+ public static interface BindingProperty<S> extends ConfigProperty<BindingConfigType, BindingConfig, S>
+ {
+ }
+
+ private abstract static class BindingReadWriteProperty<S> extends ConfigProperty.ReadWriteConfigProperty<BindingConfigType, BindingConfig, S> implements BindingProperty<S>
+ {
+ public BindingReadWriteProperty(String name)
+ {
+ super(name);
+ BINDING_PROPERTIES.add(this);
+ }
+ }
+
+ private abstract static class BindingReadOnlyProperty<S> extends ConfigProperty.ReadOnlyConfigProperty<BindingConfigType, BindingConfig, S> implements BindingProperty<S>
+ {
+ public BindingReadOnlyProperty(String name)
+ {
+ super(name);
+ BINDING_PROPERTIES.add(this);
+ }
+ }
+
+ public static final BindingReadOnlyProperty<ExchangeConfig> EXCHANGE_PROPERTY = new BindingReadOnlyProperty<ExchangeConfig>("exchange")
+ {
+ public ExchangeConfig getValue(BindingConfig object)
+ {
+ return object.getExchange();
+ }
+ };
+
+ public static final BindingReadOnlyProperty<QueueConfig> QUEUE_PROPERTY = new BindingReadOnlyProperty<QueueConfig>("queue")
+ {
+ public QueueConfig getValue(BindingConfig object)
+ {
+ return object.getQueue();
+ }
+ };
+
+ public static final BindingReadOnlyProperty<String> BINDING_KEY_PROPERTY = new BindingReadOnlyProperty<String>("bindingKey")
+ {
+ public String getValue(BindingConfig object)
+ {
+ return object.getBindingKey();
+ }
+ };
+
+ public static final BindingReadOnlyProperty<Map<String,Object>> ARGUMENTS = new BindingReadOnlyProperty<Map<String,Object>>("arguments")
+ {
+ public Map<String,Object> getValue(BindingConfig object)
+ {
+ return object.getArguments();
+ }
+ };
+
+ public static final BindingReadOnlyProperty<String> ORIGIN_PROPERTY = new BindingReadOnlyProperty<String>("origin")
+ {
+ public String getValue(BindingConfig object)
+ {
+ return object.getOrigin();
+ }
+ };
+
+ private static final BindingConfigType INSTANCE = new BindingConfigType();
+
+ private BindingConfigType()
+ {
+ }
+
+ public Collection<BindingProperty<?>> getProperties()
+ {
+ return Collections.unmodifiableList(BINDING_PROPERTIES);
+ }
+
+ public static BindingConfigType getInstance()
+ {
+ return INSTANCE;
+ }
+
+
+
+}
\ No newline at end of file
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BridgeConfig.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BridgeConfig.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BridgeConfig.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BridgeConfig.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.configuration;
+
+public interface BridgeConfig extends ConfiguredObject<BridgeConfigType, BridgeConfig>
+{
+
+ boolean isDynamic();
+
+ boolean isQueueBridge();
+
+ boolean isLocalSource();
+
+ String getSource();
+
+ String getDestination();
+
+ String getKey();
+
+ String getTag();
+
+ String getExcludes();
+
+ LinkConfig getLink();
+
+ Integer getChannelId();
+
+ int getAckBatching();
+
+ long getCreateTime();
+}
\ No newline at end of file
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org