You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC
svn commit: r447994 [10/46] - in /incubator/qpid/trunk/qpid: ./ cpp/
cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/
cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/
cpp/common/concurrent/ cpp/common/concur...
Added: incubator/qpid/trunk/qpid/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/config.xml?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/config.xml (added)
+++ incubator/qpid/trunk/qpid/java/broker/etc/config.xml Tue Sep 19 15:06:50 2006
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Copyright (c) 2006 The Apache Software Foundation
+ -
+ - Licensed under the Apache License, Version 2.0 (the "License");
+ - you may not use this file except in compliance with the License.
+ - You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -
+ -->
+<broker>
+ <prefix>${QPID_HOME}</prefix>
+ <work>${prefix}/work</work>
+ <conf>${prefix}/etc</conf>
+ <connector>
+ <ssl>false</ssl>
+ <nonssl>true</nonssl>
+ <transport>nio</transport>
+ <port>5672</port>
+ <sslport>8672</sslport>
+ <socketReceiveBuffer>32768</socketReceiveBuffer>
+ <socketSendBuffer>32768</socketSendBuffer>
+ </connector>
+ <management>
+ <enabled>true</enabled>
+ </management>
+ <advanced>
+ <filterchain enableExecutorPool="true"/>
+ <enablePooledAllocator>false</enablePooledAllocator>
+ <enableDirectBuffers>false</enableDirectBuffers>
+ <framesize>65535</framesize>
+ </advanced>
+ <security>
+ <principal-databases>
+ <principal-database>
+ <name>passwordfile</name>
+ <class>org.apache.qpid.server.security.auth.PasswordFilePrincipalDatabase</class>
+ <attributes>
+ <attribute>
+ <name>passwordFile</name>
+ <value>${conf}/passwd</value>
+ </attribute>
+ </attributes>
+ </principal-database>
+ </principal-databases>
+ <sasl>
+ <mechanisms>
+ <mechanism>
+ <initialiser>
+ <class>org.apache.qpid.server.security.auth.CRAMMD5Initialiser</class>
+ <principal-database>passwordfile</principal-database>
+ </initialiser>
+ </mechanism>
+ <mechanism>
+ <initialiser>
+ <class>org.apache.qpid.server.security.auth.amqplain.AmqPlainInitialiser</class>
+ <principal-database>passwordfile</principal-database>
+ </initialiser>
+ </mechanism>-->
+ <mechanism>
+ <initialiser>
+ <class>org.apache.qpid.server.security.auth.plain.PlainInitialiser</class>
+ <principal-database>passwordfile</principal-database>
+ </initialiser>
+ </mechanism>
+ </mechanisms>
+ </sasl>
+ </security>
+ <heartbeat>
+ <delay>0</delay>
+ <timeoutFactor>2.0</timeoutFactor>
+ </heartbeat>
+ <queue>
+ <auto_register>true</auto_register>
+ </queue>
+ <store>
+ <!--<class>org.apache.qpid.server.store.MemoryMessageStore</class>-->
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+ <environment-path>${work}/bdb</environment-path>
+ </store>
+ <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
+</broker>
Propchange: incubator/qpid/trunk/qpid/java/broker/etc/config.xml
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/etc/log4j.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/log4j.xml?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/log4j.xml (added)
+++ incubator/qpid/trunk/qpid/java/broker/etc/log4j.xml Tue Sep 19 15:06:50 2006
@@ -0,0 +1,46 @@
+<?xml version="1.0"?>
+<!--
+ -
+ - Copyright (c) 2006 The Apache Software Foundation
+ -
+ - Licensed under the Apache License, Version 2.0 (the "License");
+ - you may not use this file except in compliance with the License.
+ - You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="FileAppender" class="org.apache.log4j.FileAppender">
+ <param name="File" value="${QPID_HOME}/log/qpid.log"/>
+ <param name="Append" value="false"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%t %-5p %c{2} - %m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+ </layout>
+ </appender>
+
+ <!--<category name="org.apache.qpid.server.store">
+ <priority value="debug"/>
+ </category>-->
+
+ <root>
+ <priority value="info"/>
+ <appender-ref ref="STDOUT"/>
+ <appender-ref ref="FileAppender"/>
+ </root>
+</log4j:configuration>
Propchange: incubator/qpid/trunk/qpid/java/broker/etc/log4j.xml
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/etc/passwd
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/passwd?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/passwd (added)
+++ incubator/qpid/trunk/qpid/java/broker/etc/passwd Tue Sep 19 15:06:50 2006
@@ -0,0 +1 @@
+guest:guest
Added: incubator/qpid/trunk/qpid/java/broker/etc/qpid-server.conf
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/qpid-server.conf?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/qpid-server.conf (added)
+++ incubator/qpid/trunk/qpid/java/broker/etc/qpid-server.conf Tue Sep 19 15:06:50 2006
@@ -0,0 +1,22 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+QPID_LIBS=$QPID_HOME/lib/broker-launch.jar:$QPID_HOME/lib/bdbstore-launch.jar
+
+export JAVA=java \
+ JAVA_VM=-server \
+ JAVA_MEM=-Xmx1024m \
+ CLASSPATH=$QPID_LIBS
Added: incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml (added)
+++ incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml Tue Sep 19 15:06:50 2006
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Copyright (c) 2006 The Apache Software Foundation
+ -
+ - Licensed under the Apache License, Version 2.0 (the "License");
+ - you may not use this file except in compliance with the License.
+ - You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -
+ -->
+<virtualhosts>
+ <virtualhost>
+ <path>/development</path>
+ <bind>direct://amq.direct//queue</bind>
+ <bind>direct://amq.direct//ping</bind>
+ </virtualhost>
+</virtualhosts>
Propchange: incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/log4j.properties?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/log4j.properties (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/log4j.properties Tue Sep 19 15:06:50 2006
@@ -0,0 +1,6 @@
+log4j.rootCategory=${amqj.logging.level}, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=info
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
Propchange: incubator/qpid/trunk/qpid/java/broker/src/log4j.properties
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,702 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.exchange.MessageRouter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.server.txn.TxnOp;
+import org.apache.qpid.server.management.Managable;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.management.DefaultManagedObject;
+
+import javax.management.ObjectName;
+import javax.management.MalformedObjectNameException;
+import javax.management.JMException;
+import javax.management.MBeanException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class AMQChannel implements Managable
+{
+ public static final int DEFAULT_PREFETCH = 5000;
+
+ private static final Logger _log = Logger.getLogger(AMQChannel.class);
+
+ private final int _channelId;
+
+ private final String _channelName;
+
+ private boolean _transactional;
+
+ private long _prefetchCount;
+
+ /**
+ * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
+ * value of this represents the <b>last</b> tag sent out
+ */
+ private long _deliveryTag;
+
+ /**
+ * A channel has a default queue (the last declared) that is used when no queue name is
+ * explictily set
+ */
+ private AMQQueue _defaultQueue;
+
+ /**
+ * This tag is unique per subscription to a queue. The server returns this in response to a
+ * basic.consume request.
+ */
+ private int _consumerTag = 0;
+
+ /**
+ * The current message - which may be partial in the sense that not all frames have been received yet -
+ * which has been received by this channel. As the frames are received the message gets updated and once all
+ * frames have been received the message can then be routed.
+ */
+ private AMQMessage _currentMessage;
+
+ /**
+ * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
+ */
+ private final Map<String, AMQQueue> _consumerTag2QueueMap = new TreeMap<String, AMQQueue>();
+
+ private final MessageStore _messageStore;
+
+ private final Object _unacknowledgedMessageMapLock = new Object();
+
+ private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
+
+ private final AtomicBoolean _suspended = new AtomicBoolean(false);
+
+ private final MessageRouter _exchanges;
+
+ private final TxnBuffer _txnBuffer;
+
+ private final AMQChannelMBean _managedObject;
+
+ public ManagedObject getManagedObject()
+ {
+ return _managedObject;
+ }
+
+ /**
+ * MBean interface for the implementation AMQChannelMBean
+ */
+ public interface AMQChannelMBeanMBean extends ManagedChannel
+ {
+
+ }
+
+ /**
+ * AMQChannelMBean. It implements the management interface exposed for
+ * monitoring and managing the channel.
+ */
+ public final class AMQChannelMBean extends DefaultManagedObject implements AMQChannelMBeanMBean
+ {
+ public AMQChannelMBean()
+ {
+ super(ManagedChannel.class, ManagedChannel.TYPE);
+ }
+
+ public String getObjectInstanceName()
+ {
+ return _channelName;
+ }
+
+ public boolean isTransactional()
+ {
+ return _transactional;
+ }
+
+ public int getUnacknowledgedMessageCount()
+ {
+ return _unacknowledgedMessageMap.size();
+ }
+
+ public void commitTransactions() throws JMException
+ {
+ try
+ {
+ if (_transactional)
+ {
+ _txnBuffer.commit();
+ }
+ }
+ catch(AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ public void rollbackTransactions() throws JMException
+ {
+ if (_transactional)
+ {
+ synchronized (_txnBuffer)
+ {
+ try
+ {
+ _txnBuffer.rollback();
+ }
+ catch(AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+ }
+ }
+
+ } // End of MBean class
+
+
+ public static class UnacknowledgedMessage
+ {
+ public final AMQMessage message;
+ public final String consumerTag;
+ public AMQQueue queue;
+
+ public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag)
+ {
+ this.queue = queue;
+ this.message = message;
+ this.consumerTag = consumerTag;
+ }
+
+ private void discard() throws AMQException
+ {
+ if (queue != null)
+ {
+ message.dequeue(queue);
+ }
+ message.decrementReference();
+ }
+ }
+
+ public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
+ throws AMQException
+ {
+ _channelId = channelId;
+ _channelName = _channelId + "-" + this.hashCode();
+ _prefetchCount = DEFAULT_PREFETCH;
+ _messageStore = messageStore;
+ _exchanges = exchanges;
+ _txnBuffer = new TxnBuffer(_messageStore);
+
+ _managedObject = new AMQChannelMBean();
+ _managedObject.register();
+ }
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+ public boolean isTransactional()
+ {
+ return _transactional;
+ }
+
+ public void setTransactional(boolean transactional)
+ {
+ _transactional = transactional;
+ }
+
+ public long getPrefetchCount()
+ {
+ return _prefetchCount;
+ }
+
+ public void setPrefetchCount(long prefetchCount)
+ {
+ _prefetchCount = prefetchCount;
+ }
+
+ public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException
+ {
+ _currentMessage = new AMQMessage(_messageStore, publishBody);
+ _currentMessage.setPublisher(publisher);
+ }
+
+ public void publishContentHeader(ContentHeaderBody contentHeaderBody)
+ throws AMQException
+ {
+ if (_currentMessage == null)
+ {
+ throw new AMQException("Received content header without previously receiving a BasicDeliver frame");
+ }
+ else
+ {
+ _currentMessage.setContentHeaderBody(contentHeaderBody);
+ // check and route if header says body length is zero
+ if (contentHeaderBody.bodySize == 0)
+ {
+ routeCurrentMessage();
+ }
+ }
+ }
+
+ public void publishContentBody(ContentBody contentBody)
+ throws AMQException
+ {
+ if (_currentMessage == null)
+ {
+ throw new AMQException("Received content body without previously receiving a JmsPublishBody");
+ }
+ if (_currentMessage.getContentHeaderBody() == null)
+ {
+ throw new AMQException("Received content body without previously receiving a content header");
+ }
+
+ _currentMessage.addContentBodyFrame(contentBody);
+ if (_currentMessage.isAllContentReceived())
+ {
+ routeCurrentMessage();
+ }
+ }
+
+ protected void routeCurrentMessage() throws AMQException
+ {
+ if (_transactional)
+ {
+ //don't route this until commit
+ _txnBuffer.enlist(new Publish(_currentMessage));
+ _currentMessage = null;
+ }
+ else
+ {
+ _exchanges.routeContent(_currentMessage);
+ _currentMessage.decrementReference();
+ _currentMessage = null;
+ }
+ }
+
+ public long getNextDeliveryTag()
+ {
+ return ++_deliveryTag;
+ }
+
+ public int getNextConsumerTag()
+ {
+ return ++_consumerTag;
+ }
+
+ /**
+ * Subscribe to a queue. We register all subscriptions in the channel so that
+ * if the channel is closed we can clean up all subscriptions, even if the
+ * client does not explicitly unsubscribe from all queues.
+ *
+ * @param tag the tag chosen by the client (if null, server will generate one)
+ * @param queue the queue to subscribe to
+ * @param session the protocol session of the subscriber
+ * @return the consumer tag. This is returned to the subscriber and used in
+ * subsequent unsubscribe requests
+ * @throws ConsumerTagNotUniqueException if the tag is not unique
+ * @throws AMQException if something goes wrong
+ */
+ public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) throws AMQException, ConsumerTagNotUniqueException
+ {
+ if (tag == null)
+ {
+ tag = "sgen_" + getNextConsumerTag();
+ }
+ if (_consumerTag2QueueMap.containsKey(tag))
+ {
+ throw new ConsumerTagNotUniqueException();
+ }
+
+ queue.registerProtocolSession(session, _channelId, tag, acks);
+ _consumerTag2QueueMap.put(tag, queue);
+ return tag;
+ }
+
+
+ public void unsubscribeConsumer(AMQProtocolSession session, String consumerTag) throws AMQException
+ {
+ AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
+ if (q != null)
+ {
+ q.unregisterProtocolSession(session, _channelId, consumerTag);
+ }
+ else
+ {
+ throw new AMQException(_log, "Consumer tag " + consumerTag + " not known to channel " +
+ _channelId);
+ }
+ }
+
+ /**
+ * Called from the protocol session to close this channel and clean up.
+ *
+ * @throws AMQException if there is an error during closure
+ */
+ public void close(AMQProtocolSession session) throws AMQException
+ {
+ if (_transactional)
+ {
+ synchronized (_txnBuffer)
+ {
+ _txnBuffer.rollback();//releases messages
+ }
+ }
+ unsubscribeAllConsumers(session);
+ requeue();
+ _managedObject.unregister();
+ }
+
+ private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
+ {
+ _log.info("Unsubscribing all consumers on channel " + toString());
+ for (Map.Entry<String, AMQQueue> me : _consumerTag2QueueMap.entrySet())
+ {
+ me.getValue().unregisterProtocolSession(session, _channelId, me.getKey());
+ }
+ _consumerTag2QueueMap.clear();
+ }
+
+ /**
+ * Add a message to the channel-based list of unacknowledged messages
+ *
+ * @param message
+ * @param deliveryTag
+ * @param queue
+ */
+ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue)
+ {
+ synchronized (_unacknowledgedMessageMapLock)
+ {
+ _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag));
+ checkSuspension();
+ }
+ }
+
+ /**
+ * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel.
+ * May result in delivery to this same channel or to other subscribers.
+ */
+ public void requeue() throws AMQException
+ {
+ // we must create a new map since all the messages will get a new delivery tag when they are redelivered
+ Map<Long, UnacknowledgedMessage> currentList;
+ synchronized (_unacknowledgedMessageMapLock)
+ {
+ currentList = _unacknowledgedMessageMap;
+ _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
+ }
+
+ for (UnacknowledgedMessage unacked : currentList.values())
+ {
+ if (unacked.queue != null)
+ {
+ unacked.queue.deliver(unacked.message);
+ }
+ }
+ }
+
+ /**
+ * Called to resend all outstanding unacknowledged messages to this same channel.
+ */
+ public void resend(AMQProtocolSession session)
+ {
+ //messages go to this channel
+ synchronized (_unacknowledgedMessageMapLock)
+ {
+ for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet())
+ {
+ long deliveryTag = entry.getKey();
+ String consumerTag = entry.getValue().consumerTag;
+ AMQMessage msg = entry.getValue().message;
+
+ session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
+ }
+ }
+ }
+
+ /**
+ * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged
+ * messages to remove the queue reference and also decrement any message reference counts, without
+ * actually removing the item sine we may get an ack for a delivery tag that was generated from the
+ * deleted queue.
+ *
+ * @param queue
+ */
+ public void queueDeleted(AMQQueue queue)
+ {
+ synchronized (_unacknowledgedMessageMapLock)
+ {
+ for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet())
+ {
+ final UnacknowledgedMessage unackedMsg = unacked.getValue();
+ // we can compare the reference safely in this case
+ if (unackedMsg.queue == queue)
+ {
+ unackedMsg.queue = null;
+ try
+ {
+ unackedMsg.message.decrementReference();
+ }
+ catch (AMQException e)
+ {
+ _log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " +
+ e, e);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Acknowledge one or more messages.
+ *
+ * @param deliveryTag the last delivery tag
+ * @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only
+ * acknowledges the single message specified by the delivery tag
+ * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
+ */
+ public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
+ {
+ if (_transactional)
+ {
+ //don't handle this until commit
+ _txnBuffer.enlist(new Ack(deliveryTag, multiple));
+ }
+ else
+ {
+ handleAcknowledgement(deliveryTag, multiple);
+ }
+ }
+
+ private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
+ {
+ if (multiple)
+ {
+ LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
+ synchronized (_unacknowledgedMessageMapLock)
+ {
+ if (deliveryTag == 0)
+ {
+ //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, tells the server to acknowledge all outstanding mesages.
+ _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size());
+ acked = new LinkedList<UnacknowledgedMessage>(_unacknowledgedMessageMap.values());
+ _unacknowledgedMessageMap.clear();
+ }
+ else
+ {
+ if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
+ {
+ throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
+ }
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator();
+ while (i.hasNext())
+ {
+ Map.Entry<Long, UnacknowledgedMessage> unacked = i.next();
+ i.remove();
+ acked.add(unacked.getValue());
+ if (unacked.getKey() == deliveryTag)
+ {
+ break;
+ }
+ }
+ }
+ }
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Received multiple ack for delivery tag " + deliveryTag + ". Removing " +
+ acked.size() + " items.");
+ }
+
+ for (UnacknowledgedMessage msg : acked)
+ {
+ msg.discard();
+ }
+
+ }
+ else
+ {
+ UnacknowledgedMessage msg;
+ synchronized (_unacknowledgedMessageMapLock)
+ {
+ msg = _unacknowledgedMessageMap.remove(deliveryTag);
+ }
+ if (msg == null)
+ {
+ throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
+ }
+ msg.discard();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
+ }
+ }
+
+ checkSuspension();
+ }
+
+ /**
+ * Used only for testing purposes.
+ *
+ * @return the map of unacknowledged messages
+ */
+ public Map<Long, UnacknowledgedMessage> getUnacknowledgedMessageMap()
+ {
+ return _unacknowledgedMessageMap;
+ }
+
+ private void checkSuspension()
+ {
+ boolean suspend;
+ //noinspection SynchronizeOnNonFinalField
+ synchronized (_unacknowledgedMessageMapLock)
+ {
+ suspend = _unacknowledgedMessageMap.size() >= _prefetchCount;
+ }
+ setSuspended(suspend);
+ }
+
+ public void setSuspended(boolean suspended)
+ {
+ boolean wasSuspended = _suspended.getAndSet(suspended);
+ if (wasSuspended != suspended)
+ {
+ if (wasSuspended)
+ {
+ _log.info("Unsuspending channel " + this);
+ //may need to deliver queued messages
+ for (AMQQueue q : _consumerTag2QueueMap.values())
+ {
+ q.deliverAsync();
+ }
+ }
+ else
+ {
+ _log.info("Suspending channel " + this);
+ }
+ }
+ }
+
+ public boolean isSuspended()
+ {
+ return _suspended.get();
+ }
+
+ public void commit() throws AMQException
+ {
+ _txnBuffer.commit();
+ }
+
+ public void rollback() throws AMQException
+ {
+ //need to protect rollback and close from each other...
+ synchronized (_txnBuffer)
+ {
+ _txnBuffer.rollback();
+ }
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder(30);
+ sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(_transactional);
+ sb.append(", prefetch count: ").append(_prefetchCount);
+ return sb.toString();
+ }
+
+ public ObjectName getObjectName()
+ throws MalformedObjectNameException
+ {
+ StringBuilder sb = new StringBuilder(30);
+ sb.append("Channel:id=").append(_channelId);
+ sb.append(",transaction mode=").append(_transactional);
+ return new ObjectName(sb.toString());
+ }
+
+ public void setDefaultQueue(AMQQueue queue)
+ {
+ _defaultQueue = queue;
+ }
+
+ public AMQQueue getDefaultQueue()
+ {
+ return _defaultQueue;
+ }
+
+ private class Ack implements TxnOp
+ {
+ private final long _msgId;
+ private final boolean _multi;
+
+ Ack(long msgId, boolean multi)
+ {
+ _msgId = msgId;
+ _multi = multi;
+ }
+
+ public void commit() throws AMQException
+ {
+ handleAcknowledgement(_msgId, _multi);
+ }
+
+ public void rollback()
+ {
+ }
+ }
+
+ //TODO:
+ //implement a scheme whereby messages can be stored on disk
+ //until commit, then reloaded...
+ private class Publish implements TxnOp
+ {
+ private final AMQMessage _msg;
+
+ Publish(AMQMessage msg)
+ {
+ _msg = msg;
+ }
+
+ public void commit() throws AMQException
+ {
+ _exchanges.routeContent(_msg);
+ _msg.decrementReference();
+ }
+
+ public void rollback()
+ {
+ try
+ {
+ _msg.decrementReference();
+ }
+ catch (AMQException e)
+ {
+ _log.error("Error rolling back a publish request: " + e, e);
+ }
+ }
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/AMQChannel.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ConsumerTagNotUniqueException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ConsumerTagNotUniqueException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ConsumerTagNotUniqueException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ConsumerTagNotUniqueException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,22 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+public class ConsumerTagNotUniqueException extends Exception
+{
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ConsumerTagNotUniqueException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/Main.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/Main.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/Main.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,612 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
+import org.apache.qpid.server.protocol.AMQPProtocolProvider;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.transport.ConnectorConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.management.DefaultManagedObject;
+import org.apache.qpid.server.management.ManagedBroker;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.commons.cli.*;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+
+import javax.management.ObjectName;
+import javax.management.MalformedObjectNameException;
+import javax.management.JMException;
+import javax.management.MBeanException;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.StringTokenizer;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Main entry point for AMQPD.
+ *
+ */
+public class Main implements ProtocolVersionList
+{
+ private static final Logger _logger = Logger.getLogger(Main.class);
+
+ private static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
+
+ private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
+
+ protected static class InitException extends Exception
+ {
+ InitException(String msg)
+ {
+ super(msg);
+ }
+ }
+
+ protected final Options options = new Options();
+ protected CommandLine commandLine;
+
+ protected Main(String[] args)
+ {
+ setOptions(options);
+ if (parseCommandline(args))
+ {
+ execute();
+ }
+ }
+
+ protected boolean parseCommandline(String[] args)
+ {
+ try
+ {
+ commandLine = new PosixParser().parse(options, args);
+ return true;
+ }
+ catch (ParseException e)
+ {
+ System.err.println("Error: " + e.getMessage());
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("Qpid", options, true);
+ return false;
+ }
+ }
+
+ protected void setOptions(Options options)
+ {
+ Option help = new Option("h", "help", false, "print this message");
+ Option version = new Option("v", "version", false, "print the version information and exit");
+ Option configFile = OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").
+ withLongOpt("config").create("c");
+ Option port = OptionBuilder.withArgName("port").hasArg().withDescription("listen on the specified port. Overrides any value in the config file").
+ withLongOpt("port").create("p");
+ Option bind = OptionBuilder.withArgName("bind").hasArg().withDescription("bind to the specified address. Overrides any value in the config file").
+ withLongOpt("bind").create("b");
+ Option logconfig = OptionBuilder.withArgName("logconfig").hasArg().withDescription("use the specified log4j xml configuration file. By " +
+ "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + " in the same directory as the configuration file").
+ withLongOpt("logconfig").create("l");
+ Option logwatchconfig = OptionBuilder.withArgName("logwatch").hasArg().withDescription("monitor the log file configuration file for changes. Units are seconds. " +
+ "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
+
+ options.addOption(help);
+ options.addOption(version);
+ options.addOption(configFile);
+ options.addOption(logconfig);
+ options.addOption(logwatchconfig);
+ options.addOption(port);
+ options.addOption(bind);
+ }
+
+ protected void execute()
+ {
+ // note this understands either --help or -h. If an option only has a long name you can use that but if
+ // an option has a short name and a long name you must use the short name here.
+ if (commandLine.hasOption("h"))
+ {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("Qpid", options, true);
+ }
+ else if (commandLine.hasOption("v"))
+ {
+ String ver = "Qpid 0.9.0.0";
+ String protocol = "AMQP version(s) [major.minor]: ";
+ for (int i=0; i<pv.length; i++)
+ {
+ if (i > 0)
+ protocol += ", ";
+ protocol += pv[i][PROTOCOL_MAJOR] + "." + pv[i][PROTOCOL_MINOR];
+ }
+ System.out.println(ver + " (" + protocol + ")");
+ }
+ else
+ {
+ try
+ {
+ startup();
+ }
+ catch (InitException e)
+ {
+ System.out.println(e.getMessage());
+ }
+ catch (ConfigurationException e)
+ {
+ System.out.println("Error configuring message broker: " + e);
+ e.printStackTrace();
+ }
+ catch (Exception e)
+ {
+ System.out.println("Error intialising message broker: " + e);
+ e.printStackTrace();
+ }
+ }
+ }
+
+
+ protected void startup() throws InitException, ConfigurationException, Exception
+ {
+ final String QpidHome = System.getProperty("QPID_HOME");
+ final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
+ final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
+ if (!configFile.exists())
+ {
+ String error = "File " + configFile + " could not be found. Check the file exists and is readable.";
+
+ if (QpidHome == null)
+ {
+ error = error + "\nNote: Qpid_HOME is not set.";
+ }
+
+ throw new InitException(error);
+ }
+ else
+ {
+ System.out.println("Using configuration file " + configFile.getAbsolutePath());
+ }
+
+ String logConfig = commandLine.getOptionValue("l");
+ String logWatchConfig = commandLine.getOptionValue("w", "0");
+ if (logConfig != null)
+ {
+ File logConfigFile = new File(logConfig);
+ configureLogging(logConfigFile, logWatchConfig);
+ }
+ else
+ {
+ File configFileDirectory = configFile.getParentFile();
+ File logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME);
+ configureLogging(logConfigFile, logWatchConfig);
+ }
+
+ ApplicationRegistry.initialise(new ConfigurationFileApplicationRegistry(configFile));
+
+ _logger.info("Starting Qpid.AMQP broker");
+
+ ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
+ getConfiguredObject(ConnectorConfiguration.class);
+
+ ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers);
+
+ // the MINA default is currently to use the pooled allocator although this may change in future
+ // once more testing of the performance of the simple allocator has been done
+ if (!connectorConfig.enablePooledAllocator)
+ {
+ ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+ }
+
+ int port = connectorConfig.port;
+
+ String portStr = commandLine.getOptionValue("p");
+ if (portStr != null)
+ {
+ try
+ {
+ port = Integer.parseInt(portStr);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new InitException("Invalid port: " + portStr);
+ }
+ }
+
+ String VIRTUAL_HOSTS = "virtualhosts";
+
+ Object virtualHosts = ApplicationRegistry.getInstance().getConfiguration().getProperty(VIRTUAL_HOSTS);
+
+ if (virtualHosts != null)
+ {
+ if (virtualHosts instanceof Collection)
+ {
+ int totalVHosts = ((Collection) virtualHosts).size();
+ for (int vhost = 0; vhost < totalVHosts; vhost++)
+ {
+ setupVirtualHosts(configFile.getParent() , (String)((List)virtualHosts).get(vhost));
+ }
+ }
+ else
+ {
+ setupVirtualHosts(configFile.getParent() , (String)virtualHosts);
+ }
+ }
+ bind(port, connectorConfig);
+
+ createAndRegisterBrokerMBean();
+ }
+
+ protected void setupVirtualHosts(String configFileParent, String configFilePath) throws ConfigurationException, AMQException, URLSyntaxException
+ {
+ String configVar = "${conf}";
+
+ if (configFilePath.startsWith(configVar))
+ {
+ configFilePath = configFileParent + configFilePath.substring(configVar.length());
+ }
+
+ if (configFilePath.indexOf(".xml") != -1 )
+ {
+ VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath);
+ vHostConfig.performBindings();
+ }
+ else
+ {
+ // the virtualhosts value is a path. Search it for XML files.
+
+ File virtualHostDir = new File(configFilePath);
+
+ String[] fileNames = virtualHostDir.list();
+
+ for (int each=0; each < fileNames.length; each++)
+ {
+ if (fileNames[each].endsWith(".xml"))
+ {
+ VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath+"/"+fileNames[each]);
+ vHostConfig.performBindings();
+ }
+ }
+ }
+ }
+
+ protected void bind(int port, ConnectorConfiguration connectorConfig)
+ {
+ String bindAddr = commandLine.getOptionValue("b");
+ if (bindAddr == null)
+ {
+ bindAddr = connectorConfig.bindAddress;
+ }
+
+ try
+ {
+ //IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors);
+ IoAcceptor acceptor = connectorConfig.createAcceptor();
+ SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
+ SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
+
+ sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize);
+ sc.setSendBufferSize(connectorConfig.socketWriteBuferSize);
+ sc.setTcpNoDelay(connectorConfig.tcpNoDelay);
+
+ // if we do not use the executor pool threading model we get the default leader follower
+ // implementation provided by MINA
+ if (connectorConfig.enableExecutorPool)
+ {
+ sconfig.setThreadModel(new ReadWriteThreadModel());
+ }
+
+ if (connectorConfig.enableNonSSL)
+ {
+ AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
+ InetSocketAddress bindAddress;
+ if (bindAddr.equals("wildcard"))
+ {
+ bindAddress = new InetSocketAddress(port);
+ }
+ else
+ {
+ bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
+ }
+ acceptor.bind(bindAddress, handler, sconfig);
+ _logger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
+ }
+
+ if (connectorConfig.enableSSL)
+ {
+ AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
+ handler.setUseSSL(true);
+ try
+ {
+ acceptor.bind(new InetSocketAddress(connectorConfig.sslPort),
+ handler, sconfig);
+ _logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
+ }
+ catch (IOException e)
+ {
+ _logger.error("Unable to listen on SSL port: " + e, e);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to bind service to registry: " + e, e);
+ }
+ }
+
+ public static void main(String[] args)
+ {
+
+ new Main(args);
+ }
+
+ private byte[] parseIP(String address) throws Exception
+ {
+ StringTokenizer tokenizer = new StringTokenizer(address, ".");
+ byte[] ip = new byte[4];
+ int index = 0;
+ while (tokenizer.hasMoreTokens())
+ {
+ String token = tokenizer.nextToken();
+ try
+ {
+ ip[index++] = Byte.parseByte(token);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new Exception("Error parsing IP address: " + address, e);
+ }
+ }
+ if (index != 4)
+ {
+ throw new Exception("Invalid IP address: " + address);
+ }
+ return ip;
+ }
+
+ private void configureLogging(File logConfigFile, String logWatchConfig)
+ {
+ int logWatchTime = 0;
+ try
+ {
+ logWatchTime = Integer.parseInt(logWatchConfig);
+ }
+ catch (NumberFormatException e)
+ {
+ System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " +
+ "a non-negative integer. Using default of zero (no watching configured");
+ }
+ if (logConfigFile.exists() && logConfigFile.canRead())
+ {
+ System.out.println("Configuring logger using configuration file " + logConfigFile.getAbsolutePath());
+ if (logWatchTime > 0)
+ {
+ System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " +
+ logWatchTime + " seconds");
+ // log4j expects the watch interval in milliseconds
+ DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime * 1000);
+ }
+ else
+ {
+ DOMConfigurator.configure(logConfigFile.getAbsolutePath());
+ }
+ }
+ else
+ {
+ System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath());
+ System.err.println("Using basic log4j configuration");
+ BasicConfigurator.configure();
+ }
+ }
+
+ private void createAndRegisterBrokerMBean()
+ throws AMQException
+ {
+ new AMQBrokerManager().register();
+ }
+
+ /**
+ * MBean interface for the implementation AMQBrokerManager.
+ */
+ public interface AMQBrokerManagerMBean extends ManagedBroker
+ {
+
+ }
+ /**
+ * AMQPBrokerMBean implements the broker management interface and exposes the
+ * Broker level management features like creating and deleting exchanges and queue.
+ */
+ private final class AMQBrokerManager extends DefaultManagedObject
+ implements AMQBrokerManagerMBean
+ {
+ private final QueueRegistry _queueRegistry;
+ private final ExchangeRegistry _exchangeRegistry;
+ private final ExchangeFactory _exchangeFactory;
+ private final MessageStore _messageStore;
+
+ protected AMQBrokerManager()
+ {
+ super(ManagedBroker.class, ManagedBroker.TYPE);
+
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+ _queueRegistry = appRegistry.getQueueRegistry();
+ _exchangeRegistry = appRegistry.getExchangeRegistry();
+ _exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
+ _messageStore = ApplicationRegistry.getInstance().getMessageStore();
+ }
+
+ public String getObjectInstanceName()
+ {
+ return this.getClass().getName();
+ }
+
+ /**
+ * Creates new exchange and registers it with the registry.
+ * @param exchangeName
+ * @param type
+ * @param durable
+ * @param autoDelete
+ * @throws JMException
+ */
+ public void createNewExchange(String exchangeName,
+ String type,
+ boolean durable,
+ boolean autoDelete)
+ throws JMException
+ {
+ try
+ {
+ synchronized(_exchangeRegistry)
+ {
+ Exchange exchange = _exchangeRegistry.getExchange(exchangeName);
+
+ if (exchange == null)
+ {
+ exchange = _exchangeFactory.createExchange(exchangeName,
+ type, //eg direct
+ durable,
+ autoDelete,
+ 0); //ticket no
+ _exchangeRegistry.registerExchange(exchange);
+ }
+ else
+ {
+ throw new JMException("The exchange \"" + exchangeName + "\" already exists.");
+ }
+ }
+ }
+ catch(AMQException ex)
+ {
+ _logger.error("Error in creating exchange " + exchangeName, ex);
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * Unregisters the exchange from registry.
+ * @param exchangeName
+ * @throws JMException
+ */
+ public void unregisterExchange(String exchangeName)
+ throws JMException
+ {
+ boolean inUse = false;
+ // TODO
+ // Check if the exchange is in use.
+ // Check if there are queue-bindings with the exchnage and unregister
+ // when there are no bindings.
+ try
+ {
+ _exchangeRegistry.unregisterExchange(exchangeName, false);
+ }
+ catch(AMQException ex)
+ {
+ _logger.error("Error in unregistering exchange " + exchangeName, ex);
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * Creates a new queue and registers it with the registry and puts it
+ * in persistance storage if durable queue.
+ * @param queueName
+ * @param durable
+ * @param owner
+ * @param autoDelete
+ * @throws JMException
+ */
+ public void createQueue(String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete)
+ throws JMException
+ {
+ AMQQueue queue = _queueRegistry.getQueue(queueName);
+ if (queue == null)
+ {
+ try
+ {
+ queue = new AMQQueue(queueName, durable, owner, autoDelete, _queueRegistry);
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ _messageStore.createQueue(queue);
+ }
+ _queueRegistry.registerQueue(queue);
+ }
+ catch (AMQException ex)
+ {
+ _logger.error("Error in creating queue " + queueName, ex);
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+ else
+ {
+ throw new JMException("The queue \"" + queueName + "\" already exists.");
+ }
+ }
+
+ /**
+ * Deletes the queue from queue registry and persistant storage.
+ * @param queueName
+ * @throws JMException
+ */
+ public void deleteQueue(String queueName) throws JMException
+ {
+ AMQQueue queue = _queueRegistry.getQueue(queueName);
+ if (queue == null)
+ {
+ throw new JMException("The Queue " + queueName + " is not a registerd queue.");
+ }
+
+ try
+ {
+ queue.delete();
+ _messageStore.removeQueue(queueName);
+
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ public ObjectName getObjectName() throws MalformedObjectNameException
+ {
+ StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN);
+ objectName.append(":type=").append(getType());
+
+ return new ObjectName(objectName.toString());
+ }
+ } // End of MBean class
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/Main.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ManagedChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ManagedChannel.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ManagedChannel.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ManagedChannel.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,64 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.qpid.server;
+
+import javax.management.JMException;
+import java.io.IOException;
+
+/**
+ * The managed interface exposed to allow management of channels.
+ * @author Bhupendra Bhardwaj
+ * @version 0.1
+ */
+public interface ManagedChannel
+{
+ static final String TYPE = "Channel";
+
+ /**
+ * Tells whether the channel is transactional.
+ * @return true if the channel is transactional.
+ * @throws IOException
+ */
+ boolean isTransactional() throws IOException;
+
+ /**
+ * Tells the number of unacknowledged messages in this channel.
+ * @return number of unacknowledged messages.
+ * @throws IOException
+ */
+ int getUnacknowledgedMessageCount() throws IOException;
+
+
+ //********** Operations *****************//
+
+ /**
+ * Commits the transactions if the channel is transactional.
+ * @throws IOException
+ * @throws JMException
+ */
+ void commitTransactions() throws IOException, JMException;
+
+ /**
+ * Rollsback the transactions if the channel is transactional.
+ * @throws IOException
+ * @throws JMException
+ */
+ void rollbackTransactions() throws IOException, JMException;
+
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/ManagedChannel.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,109 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.CompositeAMQDataBlock;
+import org.apache.qpid.framing.BasicReturnBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+
+import java.util.List;
+
+/**
+ * Signals that a required delivery could not be made. This could be bacuse of
+ * the immediate flag being set and the queue having no consumers, or the mandatory
+ * flag being set and the exchange having no valid bindings.
+ */
+public abstract class RequiredDeliveryException extends AMQException
+{
+ private final String _message;
+ private final BasicPublishBody _publishBody;
+ private final ContentHeaderBody _contentHeaderBody;
+ private final List<ContentBody> _contentBodies;
+
+ public RequiredDeliveryException(String message, AMQMessage payload)
+ {
+ super(message);
+ _message = message;
+ _publishBody = payload.getPublishBody();
+ _contentHeaderBody = payload.getContentHeaderBody();
+ _contentBodies = payload.getContentBodies();
+ }
+
+ public RequiredDeliveryException(String message,
+ BasicPublishBody publishBody,
+ ContentHeaderBody contentHeaderBody,
+ List<ContentBody> contentBodies)
+ {
+ super(message);
+ _message = message;
+ _publishBody = publishBody;
+ _contentHeaderBody = contentHeaderBody;
+ _contentBodies = contentBodies;
+ }
+
+ public BasicPublishBody getPublishBody()
+ {
+ return _publishBody;
+ }
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ return _contentHeaderBody;
+ }
+
+ public List<ContentBody> getContentBodies()
+ {
+ return _contentBodies;
+ }
+
+ public CompositeAMQDataBlock getReturnMessage(int channel)
+ {
+ BasicReturnBody returnBody = new BasicReturnBody();
+ returnBody.exchange = _publishBody.exchange;
+ returnBody.replyCode = getReplyCode();
+ returnBody.replyText = _message;
+ returnBody.routingKey = _publishBody.routingKey;
+
+ AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
+
+ AMQFrame returnFrame = new AMQFrame();
+ returnFrame.bodyFrame = returnBody;
+ returnFrame.channel = channel;
+
+ allFrames[0] = returnFrame;
+ allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
+ for (int i = 2; i < allFrames.length; i++)
+ {
+ allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 2));
+ }
+
+ return new CompositeAMQDataBlock(allFrames);
+ }
+
+ public int getErrorCode()
+ {
+ return getReplyCode();
+ }
+
+ public abstract int getReplyCode();
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/RequiredDeliveryException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/Configurator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/Configurator.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/Configurator.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/Configurator.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,102 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.configuration.Configured;
+import org.apache.qpid.configuration.PropertyUtils;
+import org.apache.qpid.configuration.PropertyException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import java.lang.reflect.Field;
+
+/**
+ * This class contains utilities for populating classes automatically from values pulled from configuration
+ * files.
+ */
+public class Configurator
+{
+ private static final Logger _logger = Logger.getLogger(Configurator.class);
+
+ /**
+ * Configure a given instance using the application configuration. Note that superclasses are <b>not</b>
+ * currently configured but this could easily be added if required.
+ * @param instance the instance to configure
+ */
+ public static void configure(Object instance)
+ {
+ final Configuration config = ApplicationRegistry.getInstance().getConfiguration();
+
+ for (Field f : instance.getClass().getDeclaredFields())
+ {
+ Configured annotation = f.getAnnotation(Configured.class);
+ if (annotation != null)
+ {
+ setValueInField(f, instance, config, annotation);
+ }
+ }
+ }
+
+ private static void setValueInField(Field f, Object instance, Configuration config, Configured annotation)
+ {
+ Class fieldClass = f.getType();
+ String configPath = annotation.path();
+ try
+ {
+ if (fieldClass == String.class)
+ {
+ String val = config.getString(configPath, annotation.defaultValue());
+ val = PropertyUtils.replaceProperties(val);
+ f.set(instance, val);
+ }
+ else if (fieldClass == int.class)
+ {
+ int val = config.getInt(configPath, Integer.parseInt(annotation.defaultValue()));
+ f.setInt(instance, val);
+ }
+ else if (fieldClass == long.class)
+ {
+ long val = config.getLong(configPath, Long.parseLong(annotation.defaultValue()));
+ f.setLong(instance, val);
+ }
+ else if (fieldClass == double.class)
+ {
+ double val = config.getDouble(configPath, Double.parseDouble(annotation.defaultValue()));
+ f.setDouble(instance, val);
+ }
+ else if (fieldClass == boolean.class)
+ {
+ boolean val = config.getBoolean(configPath, Boolean.parseBoolean(annotation.defaultValue()));
+ f.setBoolean(instance, val);
+ }
+ else
+ {
+ _logger.error("Unsupported field type " + fieldClass + " for " + f + " IGNORING configured value");
+ }
+ }
+ catch (PropertyException e)
+ {
+ _logger.error("Unable to expand property: " + e + " INGORING field " + f, e);
+ }
+ catch (IllegalAccessException e)
+ {
+ _logger.error("Unable to access field " + f + " IGNORING configured value");
+ }
+ }
+}
\ No newline at end of file
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/Configurator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,217 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
+
+
+import java.util.Collection;
+
+public class VirtualHostConfiguration
+{
+ private static final Logger _logger = Logger.getLogger(VirtualHostConfiguration.class);
+
+ XMLConfiguration _config;
+
+ private static final String XML_VIRTUALHOST = "virtualhost";
+ private static final String XML_PATH = "path";
+ private static final String XML_BIND = "bind";
+ private static final String XML_VIRTUALHOST_PATH = "virtualhost.path";
+ private static final String XML_VIRTUALHOST_BIND = "virtualhost.bind";
+
+
+ public VirtualHostConfiguration(String configFile) throws ConfigurationException
+ {
+ _logger.info("Loading Config file:" + configFile);
+
+ _config = new XMLConfiguration(configFile);
+
+ if (_config.getProperty(XML_VIRTUALHOST_PATH) == null)
+ {
+ throw new ConfigurationException(
+ "Virtualhost Configuration document does not contain a valid virtualhost.");
+ }
+ }
+
+ public void performBindings() throws AMQException, ConfigurationException, URLSyntaxException
+ {
+ Object prop = _config.getProperty(XML_VIRTUALHOST_PATH);
+
+ if (prop instanceof Collection)
+ {
+ _logger.debug("Number of VirtualHosts: " + ((Collection) prop).size());
+
+ int virtualhosts = ((Collection) prop).size();
+ for (int vhost = 0; vhost < virtualhosts; vhost++)
+ {
+ loadVirtualHost(vhost);
+ }
+ }
+ else
+ {
+ loadVirtualHost(-1);
+ }
+ }
+
+ private void loadVirtualHost(int index) throws AMQException, ConfigurationException, URLSyntaxException
+ {
+ String path = XML_VIRTUALHOST;
+
+ if (index != -1)
+ {
+ path = path + "(" + index + ")";
+ }
+
+ Object prop = _config.getProperty(path + "." + XML_PATH);
+
+ if (prop == null)
+ {
+ prop = _config.getProperty(path + "." + XML_BIND);
+ String error = "Virtual Host not defined for binding";
+
+ if (prop != null)
+ {
+ if (prop instanceof Collection)
+ {
+ error += "s";
+ }
+
+ error += ": " + prop;
+ }
+
+ throw new ConfigurationException(error);
+ }
+
+ _logger.info("VirtualHost:'" + prop + "'");
+
+ prop = _config.getProperty(path + "." + XML_BIND);
+ if (prop instanceof Collection)
+ {
+ int bindings = ((Collection) prop).size();
+ _logger.debug("Number of Bindings: " + bindings);
+ for (int dest = 0; dest < bindings; dest++)
+ {
+ loadBinding(path, dest);
+ }
+ }
+ else
+ {
+ loadBinding(path, -1);
+ }
+ }
+
+ private void loadBinding(String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException
+ {
+ String path = rootpath + "." + XML_BIND;
+ if (index != -1)
+ {
+ path = path + "(" + index + ")";
+ }
+
+ String bindingString = _config.getString(path);
+
+ AMQBindingURL binding = new AMQBindingURL(bindingString);
+
+ _logger.debug("Loaded Binding:" + binding);
+
+ try
+ {
+ bind(binding);
+ }
+ catch (AMQException amqe)
+ {
+ _logger.info("Unable to bind url: " + binding);
+ throw amqe;
+ }
+ }
+
+ private void bind(AMQBindingURL binding) throws AMQException, ConfigurationException
+ {
+
+ String queueName = binding.getQueueName();
+
+ // This will occur if the URL is a Topic
+ if (queueName == null)
+ {
+ //todo register valid topic
+ ///queueName = binding.getDestinationName();
+ throw new AMQException("Topics cannot be bound. TODO Register valid topic");
+ }
+
+ //Get references to Broker Registries
+ QueueRegistry queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry();
+ MessageStore messageStore = ApplicationRegistry.getInstance().getMessageStore();
+ ExchangeRegistry exchangeRegistry = ApplicationRegistry.getInstance().getExchangeRegistry();
+
+ synchronized (queueRegistry)
+ {
+ AMQQueue queue = queueRegistry.getQueue(queueName);
+
+ if (queue == null)
+ {
+ _logger.info("Queue '" + binding.getQueueName() + "' does not exists. Creating.");
+
+ queue = new AMQQueue(queueName,
+ Boolean.parseBoolean(binding.getOption(AMQBindingURL.OPTION_DURABLE)),
+ null /* These queues will have no owner */,
+ false /* Therefore autodelete makes no sence */, queueRegistry);
+
+ if (queue.isDurable())
+ {
+ messageStore.createQueue(queue);
+ }
+
+ queueRegistry.registerQueue(queue);
+ }
+ else
+ {
+ _logger.info("Queue '" + binding.getQueueName() + "' already exists not creating.");
+ }
+
+ Exchange defaultExchange = exchangeRegistry.getExchange(binding.getExchangeName());
+ synchronized (defaultExchange)
+ {
+ if (defaultExchange == null)
+ {
+ throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + binding);
+ }
+
+ defaultExchange.registerQueue(queue.getName(), queue, null);
+
+ if (binding.getRoutingKey() == null || binding.getRoutingKey().equals(""))
+ {
+ throw new ConfigurationException("Unknown binding not specified on url:" + binding);
+ }
+
+ queue.bind(binding.getRoutingKey(), defaultExchange);
+ }
+ _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + binding.getExchangeName() + " RK:'" + binding.getRoutingKey() + "'");
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/AbstractExchange.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/AbstractExchange.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/AbstractExchange.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,134 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.management.DefaultManagedObject;
+import org.apache.qpid.server.management.Managable;
+import org.apache.qpid.server.management.ManagedObject;
+
+public abstract class AbstractExchange implements Exchange, Managable
+{
+ private String _name;
+
+ protected boolean _durable;
+
+ protected int _ticket;
+
+ protected ExchangeMBean _exchangeMbean;
+
+ /**
+ * Whether the exchange is automatically deleted once all queues have detached from it
+ */
+ protected boolean _autoDelete;
+
+ /**
+ * Abstract MBean class. This has some of the methods implemented from
+ * management intrerface for exchanges. Any implementaion of an
+ * Exchange MBean should extend this class.
+ */
+ protected abstract class ExchangeMBean extends DefaultManagedObject implements ManagedExchange
+ {
+ public ExchangeMBean()
+ {
+ super(ManagedExchange.class, ManagedExchange.TYPE);
+ }
+
+ public String getObjectInstanceName()
+ {
+ return _name;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public int getTicketNo()
+ {
+ return _ticket;
+ }
+
+ public boolean isDurable()
+ {
+ return _durable;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _autoDelete;
+ }
+
+ } // End of MBean class
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ /**
+ * Concrete exchanges must implement this method in order to create the managed representation. This is
+ * called during initialisation (template method pattern).
+ * @return the MBean
+ */
+ protected abstract ExchangeMBean createMBean();
+
+ public void initialise(String name, boolean durable, int ticket, boolean autoDelete) throws AMQException
+ {
+ _name = name;
+ _durable = durable;
+ _autoDelete = autoDelete;
+ _ticket = ticket;
+ _exchangeMbean = createMBean();
+ _exchangeMbean.register();
+ }
+
+ public boolean isDurable()
+ {
+ return _durable;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _autoDelete;
+ }
+
+ public int getTicket()
+ {
+ return _ticket;
+ }
+
+ public void close() throws AMQException
+ {
+ if (_exchangeMbean != null)
+ {
+ _exchangeMbean.unregister();
+ }
+ }
+
+ public String toString()
+ {
+ return getClass().getName() + "[" + getName() +"]";
+ }
+
+ public ManagedObject getManagedObject()
+ {
+ return _exchangeMbean;
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/AbstractExchange.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefaultExchangeFactory implements ExchangeFactory
+{
+ private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class);
+
+ private Map<String, Class<? extends Exchange>> _exchangeClassMap = new HashMap<String, Class<? extends Exchange>>();
+
+ public DefaultExchangeFactory()
+ {
+ _exchangeClassMap.put("direct", org.apache.qpid.server.exchange.DestNameExchange.class);
+ _exchangeClassMap.put("topic", org.apache.qpid.server.exchange.DestWildExchange.class);
+ _exchangeClassMap.put("headers", org.apache.qpid.server.exchange.HeadersExchange.class);
+ }
+
+ public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete,
+ int ticket)
+ throws AMQException
+ {
+ Class<? extends Exchange> exchClass = _exchangeClassMap.get(type);
+ if (exchClass == null)
+ {
+ throw new AMQException(_logger, "Unknown exchange type: " + type);
+ }
+ try
+ {
+ Exchange e = exchClass.newInstance();
+ e.initialise(exchange, durable, ticket, autoDelete);
+ return e;
+ }
+ catch (InstantiationException e)
+ {
+ throw new AMQException(_logger, "Unable to create exchange: " + e, e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new AMQException(_logger, "Unable to create exchange: " + e, e);
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
------------------------------------------------------------------------------
svn:eol-style = native