You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC

svn commit: r447994 [10/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concur...

Added: incubator/qpid/trunk/qpid/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/config.xml?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/config.xml (added)
+++ incubator/qpid/trunk/qpid/java/broker/etc/config.xml Tue Sep 19 15:06:50 2006
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Copyright (c) 2006 The Apache Software Foundation
+ -
+ - Licensed under the Apache License, Version 2.0 (the "License");
+ - you may not use this file except in compliance with the License.
+ - You may obtain a copy of the License at
+ -
+ -    http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -
+ -->
+<broker>
+    <prefix>${QPID_HOME}</prefix>
+    <work>${prefix}/work</work>
+    <conf>${prefix}/etc</conf>
+    <connector>
+        <ssl>false</ssl>
+        <nonssl>true</nonssl>
+        <transport>nio</transport>
+        <port>5672</port>
+        <sslport>8672</sslport>
+        <socketReceiveBuffer>32768</socketReceiveBuffer>
+        <socketSendBuffer>32768</socketSendBuffer>
+    </connector>
+    <management>
+        <enabled>true</enabled>
+    </management>
+    <advanced>
+        <filterchain enableExecutorPool="true"/>
+        <enablePooledAllocator>false</enablePooledAllocator>
+        <enableDirectBuffers>false</enableDirectBuffers>
+        <framesize>65535</framesize>
+    </advanced>
+    <security>
+        <principal-databases>
+            <principal-database>
+                <name>passwordfile</name>
+                <class>org.apache.qpid.server.security.auth.PasswordFilePrincipalDatabase</class>
+                <attributes>
+                    <attribute>
+                        <name>passwordFile</name>
+                        <value>${conf}/passwd</value>
+                    </attribute>
+                </attributes>
+            </principal-database>
+        </principal-databases>
+        <sasl>
+            <mechanisms>
+                <mechanism>
+                    <initialiser>
+                        <class>org.apache.qpid.server.security.auth.CRAMMD5Initialiser</class>
+                        <principal-database>passwordfile</principal-database>
+                    </initialiser>
+                </mechanism>
+                <mechanism>
+                    <initialiser>
+                        <class>org.apache.qpid.server.security.auth.amqplain.AmqPlainInitialiser</class>
+                        <principal-database>passwordfile</principal-database>
+                    </initialiser>
+                </mechanism>-->
+                <mechanism>
+                    <initialiser>
+                        <class>org.apache.qpid.server.security.auth.plain.PlainInitialiser</class>
+                        <principal-database>passwordfile</principal-database>
+                    </initialiser>
+                </mechanism>
+            </mechanisms>
+        </sasl>
+    </security>
+    <heartbeat>
+        <delay>0</delay>
+        <timeoutFactor>2.0</timeoutFactor>
+    </heartbeat>
+    <queue>
+        <auto_register>true</auto_register>
+    </queue>
+    <store>
+        <!--<class>org.apache.qpid.server.store.MemoryMessageStore</class>-->
+        <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+        <environment-path>${work}/bdb</environment-path>
+    </store>
+    <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
+</broker>

Propchange: incubator/qpid/trunk/qpid/java/broker/etc/config.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/etc/log4j.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/log4j.xml?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/log4j.xml (added)
+++ incubator/qpid/trunk/qpid/java/broker/etc/log4j.xml Tue Sep 19 15:06:50 2006
@@ -0,0 +1,46 @@
+<?xml version="1.0"?>
+<!--
+ -
+ - Copyright (c) 2006 The Apache Software Foundation
+ -
+ - Licensed under the Apache License, Version 2.0 (the "License");
+ - you may not use this file except in compliance with the License.
+ - You may obtain a copy of the License at
+ -
+ -    http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+    <appender name="FileAppender" class="org.apache.log4j.FileAppender">
+        <param name="File" value="${QPID_HOME}/log/qpid.log"/>
+        <param name="Append" value="false"/>
+
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%t %-5p %c{2} - %m%n"/>
+        </layout>
+    </appender>
+
+    <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
+
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+        </layout>
+    </appender>
+
+    <!--<category name="org.apache.qpid.server.store">
+        <priority value="debug"/>
+    </category>-->
+
+    <root>
+        <priority value="info"/>
+        <appender-ref ref="STDOUT"/>
+        <appender-ref ref="FileAppender"/>
+    </root>
+</log4j:configuration>

Propchange: incubator/qpid/trunk/qpid/java/broker/etc/log4j.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/etc/passwd
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/passwd?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/passwd (added)
+++ incubator/qpid/trunk/qpid/java/broker/etc/passwd Tue Sep 19 15:06:50 2006
@@ -0,0 +1 @@
+guest:guest

Added: incubator/qpid/trunk/qpid/java/broker/etc/qpid-server.conf
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/qpid-server.conf?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/qpid-server.conf (added)
+++ incubator/qpid/trunk/qpid/java/broker/etc/qpid-server.conf Tue Sep 19 15:06:50 2006
@@ -0,0 +1,22 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+QPID_LIBS=$QPID_HOME/lib/broker-launch.jar:$QPID_HOME/lib/bdbstore-launch.jar
+
+export JAVA=java \
+       JAVA_VM=-server \
+       JAVA_MEM=-Xmx1024m \
+       CLASSPATH=$QPID_LIBS

Added: incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml (added)
+++ incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml Tue Sep 19 15:06:50 2006
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Copyright (c) 2006 The Apache Software Foundation
+ -
+ - Licensed under the Apache License, Version 2.0 (the "License");
+ - you may not use this file except in compliance with the License.
+ - You may obtain a copy of the License at
+ -
+ -    http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -
+ -->
+<virtualhosts>
+    <virtualhost>
+        <path>/development</path>
+        <bind>direct://amq.direct//queue</bind>
+        <bind>direct://amq.direct//ping</bind>
+    </virtualhost>
+</virtualhosts>

Propchange: incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/log4j.properties?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/log4j.properties (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/log4j.properties Tue Sep 19 15:06:50 2006
@@ -0,0 +1,6 @@
+log4j.rootCategory=${amqj.logging.level}, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=info
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n

Propchange: incubator/qpid/trunk/qpid/java/broker/src/log4j.properties
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,702 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.exchange.MessageRouter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.server.txn.TxnOp;
+import org.apache.qpid.server.management.Managable;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.management.DefaultManagedObject;
+
+import javax.management.ObjectName;
+import javax.management.MalformedObjectNameException;
+import javax.management.JMException;
+import javax.management.MBeanException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class AMQChannel implements Managable
+{
+    public static final int DEFAULT_PREFETCH = 5000;
+
+    private static final Logger _log = Logger.getLogger(AMQChannel.class);
+
+    private final int _channelId;
+
+    private final String _channelName;
+
+    private boolean _transactional;
+
+    private long _prefetchCount;
+
+    /**
+     * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
+     * value of this represents the <b>last</b> tag sent out
+     */
+    private long _deliveryTag;
+
+    /**
+     * A channel has a default queue (the last declared) that is used when no queue name is
+     * explictily set
+     */
+    private AMQQueue _defaultQueue;
+
+    /**
+     * This tag is unique per subscription to a queue. The server returns this in response to a
+     * basic.consume request.
+     */
+    private int _consumerTag = 0;
+
+    /**
+     * The current message - which may be partial in the sense that not all frames have been received yet -
+     * which has been received by this channel. As the frames are received the message gets updated and once all
+     * frames have been received the message can then be routed.
+     */
+    private AMQMessage _currentMessage;
+
+    /**
+     * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
+     */
+    private final Map<String, AMQQueue> _consumerTag2QueueMap = new TreeMap<String, AMQQueue>();
+
+    private final MessageStore _messageStore;
+
+    private final Object _unacknowledgedMessageMapLock = new Object();
+
+    private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
+
+    private final AtomicBoolean _suspended = new AtomicBoolean(false);
+
+    private final MessageRouter _exchanges;
+
+    private final TxnBuffer _txnBuffer;
+
+    private final AMQChannelMBean _managedObject;
+
+    public ManagedObject getManagedObject()
+    {
+        return _managedObject;
+    }
+
+    /**
+     * MBean interface for the implementation AMQChannelMBean
+     */
+    public interface AMQChannelMBeanMBean extends ManagedChannel
+    {
+
+    }
+
+    /**
+     * AMQChannelMBean.  It implements the management interface exposed for
+     * monitoring and managing the channel.
+     */
+    public final class AMQChannelMBean extends DefaultManagedObject implements AMQChannelMBeanMBean
+    {
+        public AMQChannelMBean()
+        {
+            super(ManagedChannel.class, ManagedChannel.TYPE);
+        }
+
+        public String getObjectInstanceName()
+        {
+            return _channelName;
+        }
+
+        public boolean isTransactional()
+        {
+            return _transactional;
+        }
+
+        public int getUnacknowledgedMessageCount()
+        {
+            return _unacknowledgedMessageMap.size();
+        }
+
+        public void commitTransactions() throws JMException
+        {
+            try
+            {
+                if (_transactional)
+                {
+                    _txnBuffer.commit();
+                }
+            }
+            catch(AMQException ex)
+            {
+                throw new MBeanException(ex, ex.toString());
+            }
+        }
+
+        public void rollbackTransactions() throws JMException
+        {
+            if (_transactional)
+            {
+                synchronized (_txnBuffer)
+                {
+                    try
+                    {
+                        _txnBuffer.rollback();
+                    }
+                    catch(AMQException ex)
+                    {
+                        throw new MBeanException(ex, ex.toString());
+                    }
+                }
+            }
+        }
+
+    } // End of MBean class
+
+
+    public static class UnacknowledgedMessage
+    {
+        public final AMQMessage message;
+        public final String consumerTag;
+        public AMQQueue queue;
+
+        public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag)
+        {
+            this.queue = queue;
+            this.message = message;
+            this.consumerTag = consumerTag;
+        }
+
+        private void discard() throws AMQException
+        {
+            if (queue != null)
+            {
+                message.dequeue(queue);
+            }
+            message.decrementReference();
+        }
+    }
+
+    public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
+        throws AMQException
+    {
+        _channelId = channelId;
+        _channelName = _channelId + "-" + this.hashCode();
+        _prefetchCount = DEFAULT_PREFETCH;
+        _messageStore = messageStore;
+        _exchanges = exchanges;
+        _txnBuffer = new TxnBuffer(_messageStore);
+
+        _managedObject = new AMQChannelMBean();
+        _managedObject.register();
+    }
+
+    public int getChannelId()
+    {
+        return _channelId;
+    }
+
+    public boolean isTransactional()
+    {
+        return _transactional;
+    }
+
+    public void setTransactional(boolean transactional)
+    {
+        _transactional = transactional;
+    }
+
+    public long getPrefetchCount()
+    {
+        return _prefetchCount;
+    }
+
+    public void setPrefetchCount(long prefetchCount)
+    {
+        _prefetchCount = prefetchCount;
+    }
+
+    public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException
+    {
+        _currentMessage = new AMQMessage(_messageStore, publishBody);
+        _currentMessage.setPublisher(publisher);
+    }
+
+    public void publishContentHeader(ContentHeaderBody contentHeaderBody)
+            throws AMQException
+    {
+        if (_currentMessage == null)
+        {
+            throw new AMQException("Received content header without previously receiving a BasicDeliver frame");
+        }
+        else
+        {
+            _currentMessage.setContentHeaderBody(contentHeaderBody);
+            // check and route if header says body length is zero
+            if (contentHeaderBody.bodySize == 0)
+            {
+                routeCurrentMessage();
+            }
+        }
+    }
+
+    public void publishContentBody(ContentBody contentBody)
+            throws AMQException
+    {
+        if (_currentMessage == null)
+        {
+            throw new AMQException("Received content body without previously receiving a JmsPublishBody");
+        }
+        if (_currentMessage.getContentHeaderBody() == null)
+        {
+            throw new AMQException("Received content body without previously receiving a content header");
+        }
+
+        _currentMessage.addContentBodyFrame(contentBody);
+        if (_currentMessage.isAllContentReceived())
+        {
+            routeCurrentMessage();
+        }
+    }
+
+    protected void routeCurrentMessage() throws AMQException
+    {
+        if (_transactional)
+        {
+            //don't route this until commit
+            _txnBuffer.enlist(new Publish(_currentMessage));
+            _currentMessage = null;
+        }
+        else
+        {
+            _exchanges.routeContent(_currentMessage);
+            _currentMessage.decrementReference();
+            _currentMessage = null;
+        }
+    }
+
+    public long getNextDeliveryTag()
+    {
+        return ++_deliveryTag;
+    }
+
+    public int getNextConsumerTag()
+    {
+        return ++_consumerTag;
+    }
+
+    /**
+     * Subscribe to a queue. We register all subscriptions in the channel so that
+     * if the channel is closed we can clean up all subscriptions, even if the
+     * client does not explicitly unsubscribe from all queues.
+     *
+     * @param tag     the tag chosen by the client (if null, server will generate one)
+     * @param queue   the queue to subscribe to
+     * @param session the protocol session of the subscriber
+     * @return the consumer tag. This is returned to the subscriber and used in
+     *         subsequent unsubscribe requests
+     * @throws ConsumerTagNotUniqueException if the tag is not unique
+     * @throws AMQException                  if something goes wrong
+     */
+    public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) throws AMQException, ConsumerTagNotUniqueException
+    {
+        if (tag == null)
+        {
+            tag = "sgen_" + getNextConsumerTag();
+        }
+        if (_consumerTag2QueueMap.containsKey(tag))
+        {
+            throw new ConsumerTagNotUniqueException();
+        }
+
+        queue.registerProtocolSession(session, _channelId, tag, acks);
+        _consumerTag2QueueMap.put(tag, queue);
+        return tag;
+    }
+
+
+    public void unsubscribeConsumer(AMQProtocolSession session, String consumerTag) throws AMQException
+    {
+        AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
+        if (q != null)
+        {
+            q.unregisterProtocolSession(session, _channelId, consumerTag);
+        }
+        else
+        {
+            throw new AMQException(_log, "Consumer tag " + consumerTag + " not known to channel " +
+                    _channelId);
+        }
+    }
+
+    /**
+     * Called from the protocol session to close this channel and clean up.
+     *
+     * @throws AMQException if there is an error during closure
+     */
+    public void close(AMQProtocolSession session) throws AMQException
+    {
+        if (_transactional)
+        {
+            synchronized (_txnBuffer)
+            {
+                _txnBuffer.rollback();//releases messages
+            }
+        }
+        unsubscribeAllConsumers(session);
+        requeue();
+        _managedObject.unregister();
+    }
+
+    private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
+    {
+        _log.info("Unsubscribing all consumers on channel " + toString());
+        for (Map.Entry<String, AMQQueue> me : _consumerTag2QueueMap.entrySet())
+        {
+            me.getValue().unregisterProtocolSession(session, _channelId, me.getKey());
+        }
+        _consumerTag2QueueMap.clear();
+    }
+
+    /**
+     * Add a message to the channel-based list of unacknowledged messages
+     *
+     * @param message
+     * @param deliveryTag
+     * @param queue
+     */
+    public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue)
+    {
+        synchronized (_unacknowledgedMessageMapLock)
+        {
+            _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag));
+            checkSuspension();
+        }
+    }
+
+    /**
+     * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel.
+     * May result in delivery to this same channel or to other subscribers.
+     */
+    public void requeue() throws AMQException
+    {
+        // we must create a new map since all the messages will get a new delivery tag when they are redelivered
+        Map<Long, UnacknowledgedMessage> currentList;
+        synchronized (_unacknowledgedMessageMapLock)
+        {
+            currentList = _unacknowledgedMessageMap;
+            _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
+        }
+
+        for (UnacknowledgedMessage unacked : currentList.values())
+        {
+            if (unacked.queue != null)
+            {
+                unacked.queue.deliver(unacked.message);
+            }
+        }
+    }
+
+    /**
+     * Called to resend all outstanding unacknowledged messages to this same channel.
+     */
+    public void resend(AMQProtocolSession session)
+    {
+        //messages go to this channel
+        synchronized (_unacknowledgedMessageMapLock)
+        {
+            for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet())
+            {
+                long deliveryTag = entry.getKey();
+                String consumerTag = entry.getValue().consumerTag;
+                AMQMessage msg = entry.getValue().message;
+
+                session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
+            }
+        }
+    }
+
+    /**
+     * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged
+     * messages to remove the queue reference and also decrement any message reference counts, without
+     * actually removing the item sine we may get an ack for a delivery tag that was generated from the
+     * deleted queue.
+     *
+     * @param queue
+     */
+    public void queueDeleted(AMQQueue queue)
+    {
+        synchronized (_unacknowledgedMessageMapLock)
+        {
+            for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet())
+            {
+                final UnacknowledgedMessage unackedMsg = unacked.getValue();
+                // we can compare the reference safely in this case
+                if (unackedMsg.queue == queue)
+                {
+                    unackedMsg.queue = null;
+                    try
+                    {
+                        unackedMsg.message.decrementReference();
+                    }
+                    catch (AMQException e)
+                    {
+                        _log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " +
+                                e, e);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Acknowledge one or more messages.
+     *
+     * @param deliveryTag the last delivery tag
+     * @param multiple    if true will acknowledge all messages up to an including the delivery tag. if false only
+     *                    acknowledges the single message specified by the delivery tag
+     * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
+     */
+    public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
+    {
+        if (_transactional)
+        {
+            //don't handle this until commit
+            _txnBuffer.enlist(new Ack(deliveryTag, multiple));
+        }
+        else
+        {
+            handleAcknowledgement(deliveryTag, multiple);
+        }
+    }
+
+    private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
+    {
+        if (multiple)
+        {
+            LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
+            synchronized (_unacknowledgedMessageMapLock)
+            {
+                if (deliveryTag == 0)
+                {
+                    //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, tells the server to acknowledge all outstanding mesages.
+                    _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size());
+                    acked = new LinkedList<UnacknowledgedMessage>(_unacknowledgedMessageMap.values());
+                    _unacknowledgedMessageMap.clear();
+                }
+                else
+                {
+                    if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
+                    {
+                        throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
+                    }
+                    Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator();
+                    while (i.hasNext())
+                    {
+                        Map.Entry<Long, UnacknowledgedMessage> unacked = i.next();
+                        i.remove();
+                        acked.add(unacked.getValue());
+                        if (unacked.getKey() == deliveryTag)
+                        {
+                            break;
+                        }
+                    }
+                }
+            }
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Received multiple ack for delivery tag " + deliveryTag + ". Removing " +
+                        acked.size() + " items.");
+            }
+
+            for (UnacknowledgedMessage msg : acked)
+            {
+                msg.discard();
+            }
+
+        }
+        else
+        {
+            UnacknowledgedMessage msg;
+            synchronized (_unacknowledgedMessageMapLock)
+            {
+                msg = _unacknowledgedMessageMap.remove(deliveryTag);
+            }
+            if (msg == null)
+            {
+                throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
+            }
+            msg.discard();
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
+            }
+        }
+
+        checkSuspension();
+    }
+
+    /**
+     * Used only for testing purposes.
+     *
+     * @return the map of unacknowledged messages
+     */
+    public Map<Long, UnacknowledgedMessage> getUnacknowledgedMessageMap()
+    {
+        return _unacknowledgedMessageMap;
+    }
+
+    private void checkSuspension()
+    {
+        boolean suspend;
+        //noinspection SynchronizeOnNonFinalField
+        synchronized (_unacknowledgedMessageMapLock)
+        {
+            suspend = _unacknowledgedMessageMap.size() >= _prefetchCount;
+        }
+        setSuspended(suspend);
+    }
+
+    public void setSuspended(boolean suspended)
+    {
+        boolean wasSuspended = _suspended.getAndSet(suspended);
+        if (wasSuspended != suspended)
+        {
+            if (wasSuspended)
+            {
+                _log.info("Unsuspending channel " + this);
+                //may need to deliver queued messages
+                for (AMQQueue q : _consumerTag2QueueMap.values())
+                {
+                    q.deliverAsync();
+                }
+            }
+            else
+            {
+                _log.info("Suspending channel " + this);
+            }
+        }
+    }
+
+    public boolean isSuspended()
+    {
+        return _suspended.get();
+    }
+
+    public void commit() throws AMQException
+    {
+        _txnBuffer.commit();
+    }
+
+    public void rollback() throws AMQException
+    {
+        //need to protect rollback and close from each other...
+        synchronized (_txnBuffer)
+        {
+            _txnBuffer.rollback();
+        }
+    }
+
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder(30);
+        sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(_transactional);
+        sb.append(", prefetch count: ").append(_prefetchCount);
+        return sb.toString();
+    }
+
+    public ObjectName getObjectName()
+            throws MalformedObjectNameException
+    {
+        StringBuilder sb = new StringBuilder(30);
+        sb.append("Channel:id=").append(_channelId);
+        sb.append(",transaction mode=").append(_transactional);
+        return new ObjectName(sb.toString());
+    }
+
+    public void setDefaultQueue(AMQQueue queue)
+    {
+        _defaultQueue = queue;
+    }
+
+    public AMQQueue getDefaultQueue()
+    {
+        return _defaultQueue;
+    }
+
+    private class Ack implements TxnOp
+    {
+        private final long _msgId;
+        private final boolean _multi;
+
+        Ack(long msgId, boolean multi)
+        {
+            _msgId = msgId;
+            _multi = multi;
+        }
+
+        public void commit() throws AMQException
+        {
+            handleAcknowledgement(_msgId, _multi);
+        }
+
+        public void rollback()
+        {
+        }
+    }
+
+    //TODO:
+    //implement a scheme whereby messages can be stored on disk
+    //until commit, then reloaded...
+    private class Publish implements TxnOp
+    {
+        private final AMQMessage _msg;
+
+        Publish(AMQMessage msg)
+        {
+            _msg = msg;
+        }
+
+        public void commit() throws AMQException
+        {
+            _exchanges.routeContent(_msg);
+            _msg.decrementReference();
+        }
+
+        public void rollback()
+        {
+            try
+            {
+                _msg.decrementReference();
+            }
+            catch (AMQException e)
+            {
+                _log.error("Error rolling back a publish request: " + e, e);
+            }
+        }
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ConsumerTagNotUniqueException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ConsumerTagNotUniqueException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ConsumerTagNotUniqueException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ConsumerTagNotUniqueException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,22 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+public class ConsumerTagNotUniqueException  extends Exception
+{
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ConsumerTagNotUniqueException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/Main.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/Main.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/Main.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,612 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
+import org.apache.qpid.server.protocol.AMQPProtocolProvider;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.transport.ConnectorConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.management.DefaultManagedObject;
+import org.apache.qpid.server.management.ManagedBroker;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.commons.cli.*;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+
+import javax.management.ObjectName;
+import javax.management.MalformedObjectNameException;
+import javax.management.JMException;
+import javax.management.MBeanException;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.StringTokenizer;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Main entry point for AMQPD.
+ *
+ */
+public class Main implements ProtocolVersionList
+{
+    private static final Logger _logger = Logger.getLogger(Main.class);
+
+    private static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
+
+    private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
+
+    protected static class InitException extends Exception
+    {
+        InitException(String msg)
+        {
+            super(msg);
+        }
+    }
+
+    protected final Options options = new Options();
+    protected CommandLine commandLine;
+
+    protected Main(String[] args)
+    {
+        setOptions(options);
+        if (parseCommandline(args))
+        {
+            execute();
+        }
+    }
+
+    protected boolean parseCommandline(String[] args)
+    {
+        try
+        {
+            commandLine = new PosixParser().parse(options, args);
+            return true;
+        }
+        catch (ParseException e)
+        {
+            System.err.println("Error: " + e.getMessage());
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp("Qpid", options, true);
+            return false;
+        }
+    }
+
+    protected void setOptions(Options options)
+    {
+        Option help = new Option("h", "help", false, "print this message");
+        Option version = new Option("v", "version", false, "print the version information and exit");
+        Option configFile = OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").
+                withLongOpt("config").create("c");
+        Option port = OptionBuilder.withArgName("port").hasArg().withDescription("listen on the specified port. Overrides any value in the config file").
+                withLongOpt("port").create("p");
+        Option bind = OptionBuilder.withArgName("bind").hasArg().withDescription("bind to the specified address. Overrides any value in the config file").
+                withLongOpt("bind").create("b");
+        Option logconfig = OptionBuilder.withArgName("logconfig").hasArg().withDescription("use the specified log4j xml configuration file. By " +
+                "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + " in the same directory as the configuration file").
+                withLongOpt("logconfig").create("l");
+        Option logwatchconfig = OptionBuilder.withArgName("logwatch").hasArg().withDescription("monitor the log file configuration file for changes. Units are seconds. " +
+                "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
+
+        options.addOption(help);
+        options.addOption(version);
+        options.addOption(configFile);
+        options.addOption(logconfig);
+        options.addOption(logwatchconfig);
+        options.addOption(port);
+        options.addOption(bind);
+    }
+
+    protected void execute()
+    {
+        // note this understands either --help or -h. If an option only has a long name you can use that but if
+        // an option has a short name and a long name you must use the short name here.
+        if (commandLine.hasOption("h"))
+        {
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp("Qpid", options, true);
+        }
+        else if (commandLine.hasOption("v"))
+        {
+            String ver = "Qpid 0.9.0.0";
+            String protocol = "AMQP version(s) [major.minor]: ";
+            for (int i=0; i<pv.length; i++)
+            {
+                if (i > 0)
+                    protocol += ", ";
+                protocol += pv[i][PROTOCOL_MAJOR] + "." + pv[i][PROTOCOL_MINOR];
+            }
+            System.out.println(ver + " (" + protocol + ")");
+        }
+        else
+        {
+            try
+            {
+                startup();
+            }
+            catch (InitException e)
+            {
+                System.out.println(e.getMessage());
+            }
+            catch (ConfigurationException e)
+            {
+                System.out.println("Error configuring message broker: " + e);
+                e.printStackTrace();
+            }
+            catch (Exception e)
+            {
+                System.out.println("Error intialising message broker: " + e);
+                e.printStackTrace();
+            }
+        }
+    }
+
+
+    protected void startup() throws InitException, ConfigurationException, Exception
+    {
+        final String QpidHome = System.getProperty("QPID_HOME");
+        final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
+        final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
+        if (!configFile.exists())
+        {
+            String error = "File " + configFile + " could not be found. Check the file exists and is readable.";
+
+            if (QpidHome == null)
+            {
+                error = error + "\nNote: Qpid_HOME is not set.";
+            }
+
+            throw new InitException(error);
+        }
+        else
+        {
+            System.out.println("Using configuration file " + configFile.getAbsolutePath());
+        }
+
+        String logConfig = commandLine.getOptionValue("l");
+        String logWatchConfig = commandLine.getOptionValue("w", "0");
+        if (logConfig != null)
+        {
+            File logConfigFile = new File(logConfig);
+            configureLogging(logConfigFile, logWatchConfig);
+        }
+        else
+        {
+            File configFileDirectory = configFile.getParentFile();
+            File logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME);
+            configureLogging(logConfigFile, logWatchConfig);
+        }
+
+        ApplicationRegistry.initialise(new ConfigurationFileApplicationRegistry(configFile));
+
+        _logger.info("Starting Qpid.AMQP broker");
+
+        ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
+                getConfiguredObject(ConnectorConfiguration.class);
+
+        ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers);
+
+        // the MINA default is currently to use the pooled allocator although this may change in future
+        // once more testing of the performance of the simple allocator has been done
+        if (!connectorConfig.enablePooledAllocator)
+        {
+            ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+        }
+
+        int port = connectorConfig.port;
+
+        String portStr = commandLine.getOptionValue("p");
+        if (portStr != null)
+        {
+            try
+            {
+                port = Integer.parseInt(portStr);
+            }
+            catch (NumberFormatException e)
+            {
+                throw new InitException("Invalid port: " + portStr);
+            }
+        }
+
+        String VIRTUAL_HOSTS = "virtualhosts";
+
+        Object virtualHosts = ApplicationRegistry.getInstance().getConfiguration().getProperty(VIRTUAL_HOSTS);
+
+        if (virtualHosts != null)
+        {
+            if (virtualHosts instanceof Collection)
+            {
+                int totalVHosts = ((Collection) virtualHosts).size();
+                for (int vhost = 0; vhost < totalVHosts; vhost++)
+                {
+                    setupVirtualHosts(configFile.getParent() , (String)((List)virtualHosts).get(vhost));
+                }
+            }
+            else
+            {
+               setupVirtualHosts(configFile.getParent() , (String)virtualHosts);
+            }
+        }
+        bind(port, connectorConfig);
+
+        createAndRegisterBrokerMBean();
+    }
+
+    protected void setupVirtualHosts(String configFileParent, String configFilePath) throws ConfigurationException, AMQException, URLSyntaxException
+    {
+        String configVar = "${conf}";
+
+        if (configFilePath.startsWith(configVar))
+        {
+            configFilePath = configFileParent + configFilePath.substring(configVar.length());
+        }
+
+        if (configFilePath.indexOf(".xml") != -1 )
+        {
+            VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath);
+            vHostConfig.performBindings();
+        }
+        else
+        {
+            // the virtualhosts value is a path. Search it for XML files.
+
+            File virtualHostDir = new File(configFilePath);
+
+            String[] fileNames = virtualHostDir.list();
+
+            for (int each=0; each < fileNames.length; each++)
+            {
+                if (fileNames[each].endsWith(".xml"))
+                {
+                    VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath+"/"+fileNames[each]);
+                    vHostConfig.performBindings();
+                }
+            }
+        }
+    }
+
+    protected void bind(int port, ConnectorConfiguration connectorConfig)
+    {
+        String bindAddr = commandLine.getOptionValue("b");
+        if (bindAddr == null)
+        {
+            bindAddr = connectorConfig.bindAddress;
+        }
+
+        try
+        {
+            //IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors);
+            IoAcceptor acceptor = connectorConfig.createAcceptor();
+            SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
+            SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
+
+            sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize);
+            sc.setSendBufferSize(connectorConfig.socketWriteBuferSize);
+            sc.setTcpNoDelay(connectorConfig.tcpNoDelay);
+
+            // if we do not use the executor pool threading model we get the default leader follower
+            // implementation provided by MINA
+            if (connectorConfig.enableExecutorPool)
+            {
+                sconfig.setThreadModel(new ReadWriteThreadModel());
+            }
+
+            if (connectorConfig.enableNonSSL)
+            {
+                AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
+                InetSocketAddress bindAddress;
+                if (bindAddr.equals("wildcard"))
+                {
+                    bindAddress = new InetSocketAddress(port);
+                }
+                else
+                {
+                    bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
+                }
+                acceptor.bind(bindAddress, handler, sconfig);
+                _logger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
+            }
+
+            if (connectorConfig.enableSSL)
+            {
+                AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
+                handler.setUseSSL(true);
+                try
+                {
+                    acceptor.bind(new InetSocketAddress(connectorConfig.sslPort),
+                                  handler, sconfig);
+                    _logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
+                }
+                catch (IOException e)
+                {
+                    _logger.error("Unable to listen on SSL port: " + e, e);
+                }
+            }
+        }
+        catch (Exception e)
+        {
+            _logger.error("Unable to bind service to registry: " + e, e);
+        }
+    }
+
+    public static void main(String[] args)
+    {
+
+        new Main(args);
+    }
+
+    private byte[] parseIP(String address) throws Exception
+    {
+        StringTokenizer tokenizer = new StringTokenizer(address, ".");
+        byte[] ip = new byte[4];
+        int index = 0;
+        while (tokenizer.hasMoreTokens())
+        {
+            String token = tokenizer.nextToken();
+            try
+            {
+                ip[index++] = Byte.parseByte(token);
+            }
+            catch (NumberFormatException e)
+            {
+                throw new Exception("Error parsing IP address: " + address, e);
+            }
+        }
+        if (index != 4)
+        {
+            throw new Exception("Invalid IP address: " + address);
+        }
+        return ip;
+    }
+
+    private void configureLogging(File logConfigFile, String logWatchConfig)
+    {
+        int logWatchTime = 0;
+        try
+        {
+            logWatchTime = Integer.parseInt(logWatchConfig);
+        }
+        catch (NumberFormatException e)
+        {
+            System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " +
+                    "a non-negative integer. Using default of zero (no watching configured");
+        }
+        if (logConfigFile.exists() && logConfigFile.canRead())
+        {
+            System.out.println("Configuring logger using configuration file " + logConfigFile.getAbsolutePath());
+            if (logWatchTime > 0)
+            {
+                System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " +
+                        logWatchTime + " seconds");
+                // log4j expects the watch interval in milliseconds
+                DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime * 1000);
+            }
+            else
+            {
+                DOMConfigurator.configure(logConfigFile.getAbsolutePath());
+            }
+        }
+        else
+        {
+            System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath());
+            System.err.println("Using basic log4j configuration");
+            BasicConfigurator.configure();
+        }
+    }
+
+    private void createAndRegisterBrokerMBean()
+        throws AMQException
+    {
+        new AMQBrokerManager().register();
+    }
+
+    /**
+     * MBean interface for the implementation AMQBrokerManager.
+     */
+    public interface AMQBrokerManagerMBean extends ManagedBroker
+    {
+
+    }
+    /**
+     * AMQPBrokerMBean implements the broker management interface and exposes the
+     * Broker level management features like creating and deleting exchanges and queue.
+     */
+    private final class AMQBrokerManager extends DefaultManagedObject
+                                         implements AMQBrokerManagerMBean
+    {
+        private final QueueRegistry    _queueRegistry;
+        private final ExchangeRegistry _exchangeRegistry;
+        private final ExchangeFactory  _exchangeFactory;
+        private final MessageStore     _messageStore;
+
+        protected AMQBrokerManager()
+        {
+            super(ManagedBroker.class, ManagedBroker.TYPE);
+
+            IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+            _queueRegistry    = appRegistry.getQueueRegistry();
+            _exchangeRegistry = appRegistry.getExchangeRegistry();
+            _exchangeFactory  = ApplicationRegistry.getInstance().getExchangeFactory();
+            _messageStore     = ApplicationRegistry.getInstance().getMessageStore();
+       }
+
+        public String getObjectInstanceName()
+        {
+            return this.getClass().getName();
+        }
+
+        /**
+         * Creates new exchange and registers it with the registry.
+         * @param exchangeName
+         * @param type
+         * @param durable
+         * @param autoDelete
+         * @throws JMException
+         */
+        public void createNewExchange(String exchangeName,
+                                      String type,
+                                      boolean durable,
+                                      boolean autoDelete)
+            throws JMException
+        {
+            try
+            {
+                synchronized(_exchangeRegistry)
+                {
+                    Exchange exchange = _exchangeRegistry.getExchange(exchangeName);
+
+                    if (exchange == null)
+                    {
+                        exchange = _exchangeFactory.createExchange(exchangeName,
+                                                               type,        //eg direct
+                                                               durable,
+                                                               autoDelete,
+                                                               0);         //ticket no
+                        _exchangeRegistry.registerExchange(exchange);
+                    }
+                    else
+                    {
+                        throw new JMException("The exchange \"" + exchangeName + "\" already exists.");
+                    }
+                }
+            }
+            catch(AMQException ex)
+            {
+                _logger.error("Error in creating exchange " + exchangeName, ex);
+                throw new MBeanException(ex, ex.toString());
+            }
+        }
+
+        /**
+         * Unregisters the exchange from registry.
+         * @param exchangeName
+         * @throws JMException
+         */
+        public void unregisterExchange(String exchangeName)
+            throws JMException
+        {
+            boolean inUse = false;
+            // TODO
+            // Check if the exchange is in use.
+            // Check if there are queue-bindings with the exchnage and unregister
+            // when there are no bindings.
+            try
+            {
+                _exchangeRegistry.unregisterExchange(exchangeName, false);
+            }
+            catch(AMQException ex)
+            {
+                _logger.error("Error in unregistering exchange " + exchangeName, ex);
+                throw new MBeanException(ex, ex.toString());
+            }
+        }
+
+        /**
+         * Creates a new queue and registers it with the registry and puts it
+         * in persistance storage if durable queue.
+         * @param queueName
+         * @param durable
+         * @param owner
+         * @param autoDelete
+         * @throws JMException
+         */
+        public void createQueue(String queueName,
+                                boolean durable,
+                                String owner,
+                                boolean autoDelete)
+            throws JMException
+        {
+            AMQQueue queue = _queueRegistry.getQueue(queueName);
+            if (queue == null)
+            {
+                try
+                {
+                    queue = new AMQQueue(queueName, durable, owner, autoDelete, _queueRegistry);
+                    if (queue.isDurable() && !queue.isAutoDelete())
+                    {
+                        _messageStore.createQueue(queue);
+                    }
+                    _queueRegistry.registerQueue(queue);
+                }
+                catch (AMQException ex)
+                {
+                    _logger.error("Error in creating queue " + queueName, ex);
+                    throw new MBeanException(ex, ex.toString());
+                }
+            }
+            else
+            {
+                throw new JMException("The queue \"" + queueName + "\" already exists.");
+            }
+        }
+
+        /**
+         * Deletes the queue from queue registry and persistant storage.
+         * @param queueName
+         * @throws JMException
+         */
+        public void deleteQueue(String queueName) throws JMException
+        {
+            AMQQueue queue = _queueRegistry.getQueue(queueName);
+            if (queue == null)
+            {
+                throw new JMException("The Queue " + queueName + " is not a registerd queue.");
+            }
+
+            try
+            {
+                queue.delete();
+                _messageStore.removeQueue(queueName);
+
+            }
+            catch (AMQException ex)
+            {
+                throw new MBeanException(ex, ex.toString());
+            }
+        }
+
+        public ObjectName getObjectName() throws MalformedObjectNameException
+        {
+            StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN);
+            objectName.append(":type=").append(getType());
+
+            return new ObjectName(objectName.toString());
+        }
+    } // End of MBean class
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/Main.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ManagedChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ManagedChannel.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ManagedChannel.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ManagedChannel.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,64 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.qpid.server;
+
+import javax.management.JMException;
+import java.io.IOException;
+
+/**
+ * The managed interface exposed to allow management of channels.
+ * @author   Bhupendra Bhardwaj
+ * @version  0.1
+ */
+public interface ManagedChannel
+{
+    static final String TYPE = "Channel";
+
+    /**
+     * Tells whether the channel is transactional.
+     * @return true if the channel is transactional.
+     * @throws IOException
+     */
+    boolean isTransactional() throws IOException;
+
+    /**
+     * Tells the number of unacknowledged messages in this channel.
+     * @return number of unacknowledged messages.
+     * @throws IOException
+     */
+    int getUnacknowledgedMessageCount() throws IOException;
+
+    
+    //********** Operations *****************//
+
+    /**
+     * Commits the transactions if the channel is transactional.
+     * @throws IOException
+     * @throws JMException
+     */
+    void commitTransactions() throws IOException, JMException;
+
+    /**
+     * Rollsback the transactions if the channel is transactional.
+     * @throws IOException
+     * @throws JMException
+     */
+    void rollbackTransactions() throws IOException, JMException;
+
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ManagedChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,109 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.CompositeAMQDataBlock;
+import org.apache.qpid.framing.BasicReturnBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+
+import java.util.List;
+
+/**
+ * Signals that a required delivery could not be made. This could be bacuse of
+ * the immediate flag being set and the queue having no consumers, or the mandatory
+ * flag being set and the exchange having no valid bindings.
+ */
+public abstract class RequiredDeliveryException extends AMQException
+{
+    private final String _message;
+    private final BasicPublishBody _publishBody;
+    private final ContentHeaderBody _contentHeaderBody;
+    private final List<ContentBody> _contentBodies;
+
+    public RequiredDeliveryException(String message, AMQMessage payload)
+    {
+        super(message);
+        _message = message;
+        _publishBody = payload.getPublishBody();
+        _contentHeaderBody = payload.getContentHeaderBody();
+        _contentBodies = payload.getContentBodies();
+    }
+
+    public RequiredDeliveryException(String message,
+                                BasicPublishBody publishBody,
+                                ContentHeaderBody contentHeaderBody,
+                                List<ContentBody> contentBodies)
+    {
+        super(message);
+        _message = message;
+        _publishBody = publishBody;
+        _contentHeaderBody = contentHeaderBody;
+        _contentBodies = contentBodies;
+    }
+
+    public BasicPublishBody getPublishBody()
+    {
+        return _publishBody;
+    }
+
+    public ContentHeaderBody getContentHeaderBody()
+    {
+        return _contentHeaderBody;
+    }
+
+    public List<ContentBody> getContentBodies()
+    {
+        return _contentBodies;
+    }
+
+    public CompositeAMQDataBlock getReturnMessage(int channel)
+    {
+        BasicReturnBody returnBody = new BasicReturnBody();
+        returnBody.exchange = _publishBody.exchange;
+        returnBody.replyCode = getReplyCode();
+        returnBody.replyText = _message;
+        returnBody.routingKey = _publishBody.routingKey;
+
+        AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
+
+        AMQFrame returnFrame = new AMQFrame();
+        returnFrame.bodyFrame = returnBody;
+        returnFrame.channel = channel;
+
+        allFrames[0] = returnFrame;
+        allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
+        for (int i = 2; i < allFrames.length; i++)
+        {
+            allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 2));
+        }
+
+        return new CompositeAMQDataBlock(allFrames);
+    }
+
+    public int getErrorCode()
+    {
+        return getReplyCode();
+    }    
+
+    public abstract int getReplyCode();
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/Configurator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/Configurator.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/Configurator.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/Configurator.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,102 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.configuration.Configured;
+import org.apache.qpid.configuration.PropertyUtils;
+import org.apache.qpid.configuration.PropertyException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import java.lang.reflect.Field;
+
+/**
+ * This class contains utilities for populating classes automatically from values pulled from configuration
+ * files.
+ */
+public class Configurator
+{
+    private static final Logger _logger = Logger.getLogger(Configurator.class);
+
+    /**
+     * Configure a given instance using the application configuration. Note that superclasses are <b>not</b>
+     * currently configured but this could easily be added if required.
+     * @param instance the instance to configure
+     */
+    public static void configure(Object instance)
+    {
+        final Configuration config = ApplicationRegistry.getInstance().getConfiguration();
+
+        for (Field f : instance.getClass().getDeclaredFields())
+        {
+            Configured annotation = f.getAnnotation(Configured.class);
+            if (annotation != null)
+            {
+                setValueInField(f, instance, config, annotation);
+            }
+        }
+    }
+
+    private static void setValueInField(Field f, Object instance, Configuration config, Configured annotation)
+    {
+        Class fieldClass = f.getType();
+        String configPath = annotation.path();
+        try
+        {
+            if (fieldClass == String.class)
+            {
+                String val = config.getString(configPath, annotation.defaultValue());
+                val = PropertyUtils.replaceProperties(val);
+                f.set(instance, val);
+            }
+            else if (fieldClass == int.class)
+            {
+                int val = config.getInt(configPath, Integer.parseInt(annotation.defaultValue()));
+                f.setInt(instance, val);
+            }
+            else if (fieldClass == long.class)
+            {
+                long val = config.getLong(configPath, Long.parseLong(annotation.defaultValue()));
+                f.setLong(instance, val);
+            }
+            else if (fieldClass == double.class)
+            {
+                double val = config.getDouble(configPath, Double.parseDouble(annotation.defaultValue()));
+                f.setDouble(instance, val);
+            }
+            else if (fieldClass == boolean.class)
+            {
+                boolean val = config.getBoolean(configPath, Boolean.parseBoolean(annotation.defaultValue()));
+                f.setBoolean(instance, val);
+            }
+            else
+            {
+                _logger.error("Unsupported field type " + fieldClass + " for " + f + " IGNORING configured value");
+            }
+        }
+        catch (PropertyException e)
+        {
+            _logger.error("Unable to expand property: " + e + " INGORING field " + f, e);
+        }
+        catch (IllegalAccessException e)
+        {
+            _logger.error("Unable to access field " + f + " IGNORING configured value");
+        }
+    }
+}
\ No newline at end of file

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/Configurator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,217 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
+
+
+import java.util.Collection;
+
+public class VirtualHostConfiguration
+{
+    private static final Logger _logger = Logger.getLogger(VirtualHostConfiguration.class);
+
+    XMLConfiguration _config;
+
+    private static final String XML_VIRTUALHOST = "virtualhost";
+    private static final String XML_PATH = "path";
+    private static final String XML_BIND = "bind";
+    private static final String XML_VIRTUALHOST_PATH = "virtualhost.path";
+    private static final String XML_VIRTUALHOST_BIND = "virtualhost.bind";
+
+
+    public VirtualHostConfiguration(String configFile) throws ConfigurationException
+    {
+        _logger.info("Loading Config file:" + configFile);
+
+        _config = new XMLConfiguration(configFile);
+
+        if (_config.getProperty(XML_VIRTUALHOST_PATH) == null)
+        {
+            throw new ConfigurationException(
+                    "Virtualhost Configuration document does not contain a valid virtualhost.");
+        }
+    }
+
+    public void performBindings() throws AMQException, ConfigurationException, URLSyntaxException
+    {
+        Object prop = _config.getProperty(XML_VIRTUALHOST_PATH);
+
+        if (prop instanceof Collection)
+        {
+            _logger.debug("Number of VirtualHosts: " + ((Collection) prop).size());
+
+            int virtualhosts = ((Collection) prop).size();
+            for (int vhost = 0; vhost < virtualhosts; vhost++)
+            {
+                loadVirtualHost(vhost);
+            }
+        }
+        else
+        {
+            loadVirtualHost(-1);
+        }
+    }
+
+    private void loadVirtualHost(int index) throws AMQException, ConfigurationException, URLSyntaxException
+    {
+        String path = XML_VIRTUALHOST;
+
+        if (index != -1)
+        {
+            path = path + "(" + index + ")";
+        }
+
+        Object prop = _config.getProperty(path + "." + XML_PATH);
+
+        if (prop == null)
+        {
+            prop = _config.getProperty(path + "." + XML_BIND);
+            String error = "Virtual Host not defined for binding";
+
+            if (prop != null)
+            {
+                if (prop instanceof Collection)
+                {
+                    error += "s";
+                }
+
+                error += ": " + prop;
+            }
+
+            throw new ConfigurationException(error);
+        }
+
+        _logger.info("VirtualHost:'" + prop + "'");
+
+        prop = _config.getProperty(path + "." + XML_BIND);
+        if (prop instanceof Collection)
+        {
+            int bindings = ((Collection) prop).size();
+            _logger.debug("Number of Bindings: " + bindings);
+            for (int dest = 0; dest < bindings; dest++)
+            {
+                loadBinding(path, dest);
+            }
+        }
+        else
+        {
+            loadBinding(path, -1);
+        }
+    }
+
+    private void loadBinding(String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException
+    {
+        String path = rootpath + "." + XML_BIND;
+        if (index != -1)
+        {
+            path = path + "(" + index + ")";
+        }
+
+        String bindingString = _config.getString(path);
+
+        AMQBindingURL binding = new AMQBindingURL(bindingString);
+
+        _logger.debug("Loaded Binding:" + binding);
+
+        try
+        {
+            bind(binding);
+        }
+        catch (AMQException amqe)
+        {
+            _logger.info("Unable to bind url: " + binding);
+            throw amqe;
+        }
+    }
+
+    private void bind(AMQBindingURL binding) throws AMQException, ConfigurationException
+    {
+
+        String queueName = binding.getQueueName();
+
+        // This will occur if the URL is a Topic
+        if (queueName == null)
+        {
+            //todo register valid topic
+            ///queueName = binding.getDestinationName();
+            throw new AMQException("Topics cannot be bound. TODO Register valid topic");
+        }
+
+        //Get references to Broker Registries
+        QueueRegistry queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry();
+        MessageStore messageStore = ApplicationRegistry.getInstance().getMessageStore();
+        ExchangeRegistry exchangeRegistry = ApplicationRegistry.getInstance().getExchangeRegistry();
+
+        synchronized (queueRegistry)
+        {
+            AMQQueue queue = queueRegistry.getQueue(queueName);
+
+            if (queue == null)
+            {
+                _logger.info("Queue '" + binding.getQueueName() + "' does not exists. Creating.");
+
+                queue = new AMQQueue(queueName,
+                        Boolean.parseBoolean(binding.getOption(AMQBindingURL.OPTION_DURABLE)),
+                        null /* These queues will have no owner */,
+                        false /* Therefore autodelete makes no sence */, queueRegistry);
+
+                if (queue.isDurable())
+                {
+                    messageStore.createQueue(queue);
+                }
+
+                queueRegistry.registerQueue(queue);
+            }
+            else
+            {
+                _logger.info("Queue '" + binding.getQueueName() + "' already exists not creating.");
+            }
+
+            Exchange defaultExchange = exchangeRegistry.getExchange(binding.getExchangeName());
+            synchronized (defaultExchange)
+            {
+                if (defaultExchange == null)
+                {
+                    throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + binding);
+                }
+
+                defaultExchange.registerQueue(queue.getName(), queue, null);
+
+                if (binding.getRoutingKey() == null || binding.getRoutingKey().equals(""))
+                {
+                    throw new ConfigurationException("Unknown binding not specified on url:" + binding);
+                }
+
+                queue.bind(binding.getRoutingKey(), defaultExchange);
+            }
+            _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + binding.getExchangeName() + " RK:'" + binding.getRoutingKey() + "'");
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/AbstractExchange.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/AbstractExchange.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/AbstractExchange.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,134 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.management.DefaultManagedObject;
+import org.apache.qpid.server.management.Managable;
+import org.apache.qpid.server.management.ManagedObject;
+
+public abstract class AbstractExchange implements Exchange, Managable
+{
+    private String _name;
+
+    protected boolean _durable;
+
+    protected int _ticket;
+
+    protected ExchangeMBean _exchangeMbean;
+
+    /**
+     * Whether the exchange is automatically deleted once all queues have detached from it
+     */
+    protected boolean _autoDelete;
+
+    /**
+     * Abstract MBean class. This has some of the methods implemented from
+     * management intrerface for exchanges. Any implementaion of an
+     * Exchange MBean should extend this class.
+     */
+    protected abstract class ExchangeMBean extends DefaultManagedObject implements ManagedExchange
+    {
+        public ExchangeMBean()
+        {
+            super(ManagedExchange.class, ManagedExchange.TYPE);
+        }
+
+        public String getObjectInstanceName()
+        {
+            return _name;
+        }
+
+        public String getName()
+        {
+            return _name;
+        }
+
+        public int getTicketNo()
+        {
+            return _ticket;
+        }
+
+        public boolean isDurable()
+        {
+            return _durable;
+        }
+
+        public boolean isAutoDelete()
+        {
+            return _autoDelete;
+        }
+
+    } // End of MBean class
+
+    public String getName()
+    {
+        return _name;
+    }
+
+    /**
+     * Concrete exchanges must implement this method in order to create the managed representation. This is
+     * called during initialisation (template method pattern).
+     * @return the MBean
+     */
+    protected abstract ExchangeMBean createMBean();
+
+    public void initialise(String name, boolean durable, int ticket, boolean autoDelete) throws AMQException
+    {
+        _name = name;
+        _durable = durable;
+        _autoDelete = autoDelete;
+        _ticket = ticket;
+        _exchangeMbean = createMBean();
+        _exchangeMbean.register();
+    }
+
+    public boolean isDurable()
+    {
+        return _durable;
+    }
+
+    public boolean isAutoDelete()
+    {
+        return _autoDelete;
+    }
+
+    public int getTicket()
+    {
+        return _ticket;
+    }
+
+    public void close() throws AMQException
+    {
+        if (_exchangeMbean != null)
+        {
+            _exchangeMbean.unregister();
+        }
+    }
+
+    public String toString()
+    {
+        return getClass().getName() + "[" + getName() +"]";
+    }
+
+    public ManagedObject getManagedObject()
+    {
+        return _exchangeMbean;
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/AbstractExchange.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefaultExchangeFactory implements ExchangeFactory
+{
+    private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class);
+
+    private Map<String, Class<? extends Exchange>> _exchangeClassMap = new HashMap<String, Class<? extends Exchange>>();
+
+    public DefaultExchangeFactory()
+    {
+        _exchangeClassMap.put("direct", org.apache.qpid.server.exchange.DestNameExchange.class);
+        _exchangeClassMap.put("topic", org.apache.qpid.server.exchange.DestWildExchange.class);
+        _exchangeClassMap.put("headers", org.apache.qpid.server.exchange.HeadersExchange.class);
+    }
+
+    public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete,
+                                   int ticket)
+            throws AMQException
+    {
+        Class<? extends Exchange> exchClass = _exchangeClassMap.get(type);
+        if (exchClass == null)
+        {
+            throw new AMQException(_logger, "Unknown exchange type: " + type);
+        }
+        try
+        {
+            Exchange e = exchClass.newInstance();
+            e.initialise(exchange, durable, ticket, autoDelete);
+            return e;
+        }
+        catch (InstantiationException e)
+        {
+            throw new AMQException(_logger, "Unable to create exchange: " + e, e);
+        }
+        catch (IllegalAccessException e)
+        {
+            throw new AMQException(_logger, "Unable to create exchange: " + e, e);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native