You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/02 23:29:35 UTC

svn commit: r781177 [8/11] - in /activemq/sandbox/activemq-flow: activemq-bio/ activemq-bio/src/main/java/org/ activemq-bio/src/main/java/org/apache/ activemq-bio/src/main/java/org/apache/activemq/ activemq-bio/src/main/java/org/apache/activemq/transpo...

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A utility class used by the Session for dispatching messages asynchronously
+ * to consumers
+ * 
+ * @version $Revision$
+ * @see javax.jms.Session
+ */
+public class ActiveMQSessionExecutor implements Task {
+    private static final Log LOG = LogFactory.getLog(ActiveMQSessionExecutor.class);
+
+    private ActiveMQSession session;
+    private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
+    private boolean dispatchedBySessionPool;
+    private volatile TaskRunner taskRunner;
+    private boolean startedOrWarnedThatNotStarted;
+
+    ActiveMQSessionExecutor(ActiveMQSession session) {
+        this.session = session;
+    }
+
+    void setDispatchedBySessionPool(boolean value) {
+        dispatchedBySessionPool = value;
+        wakeup();
+    }
+
+    void execute(MessageDispatch message) throws InterruptedException {
+
+        if (!startedOrWarnedThatNotStarted) {
+
+            ActiveMQConnection connection = session.connection;
+            long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout();
+            if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) {
+                startedOrWarnedThatNotStarted = true;
+            } else {
+                long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated();
+
+                // lets only warn when a significant amount of time has passed
+                // just in case its normal operation
+                if (elapsedTime > aboutUnstartedConnectionTimeout) {
+                    LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection
+                             + " Received: " + message);
+                    startedOrWarnedThatNotStarted = true;
+                }
+            }
+        }
+
+        if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) {
+            dispatch(message);
+        } else {
+            messageQueue.enqueue(message);
+            wakeup();
+        }
+    }
+
+    public void wakeup() {
+        if (!dispatchedBySessionPool) {
+            if (session.isSessionAsyncDispatch()) {
+                try {
+                    TaskRunner taskRunner = this.taskRunner;
+                    if (taskRunner == null) {
+                        synchronized (this) {
+                            if (this.taskRunner == null) {
+                                this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
+                                        "ActiveMQ Session: " + session.getSessionId());
+                            }
+                            taskRunner = this.taskRunner;
+                        }
+                    }
+                    taskRunner.wakeup();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            } else {
+                while (iterate()) {
+                }
+            }
+        }
+    }
+
+    void executeFirst(MessageDispatch message) {
+        messageQueue.enqueueFirst(message);
+        wakeup();
+    }
+
+    public boolean hasUncomsumedMessages() {
+        return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
+    }
+
+    void dispatch(MessageDispatch message) {
+
+        // TODO - we should use a Map for this indexed by consumerId
+
+        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
+            ConsumerId consumerId = message.getConsumerId();
+            if (consumerId.equals(consumer.getConsumerId())) {
+                consumer.dispatch(message);
+                break;
+            }
+        }
+    }
+
+    synchronized void start() {
+        if (!messageQueue.isRunning()) {
+            messageQueue.start();
+            if (hasUncomsumedMessages()) {
+                wakeup();
+            }
+        }
+    }
+
+    void stop() throws JMSException {
+        try {
+            if (messageQueue.isRunning()) {
+                messageQueue.stop();
+                TaskRunner taskRunner = this.taskRunner;
+                if (taskRunner != null) {
+                    this.taskRunner = null;
+                    taskRunner.shutdown();
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw JMSExceptionSupport.create(e);
+        }
+    }
+
+    boolean isRunning() {
+        return messageQueue.isRunning();
+    }
+
+    void close() {
+        messageQueue.close();
+    }
+
+    void clear() {
+        messageQueue.clear();
+    }
+
+    MessageDispatch dequeueNoWait() {
+        return messageQueue.dequeueNoWait();
+    }
+
+    protected void clearMessagesInProgress() {
+        messageQueue.clear();
+    }
+
+    public boolean isEmpty() {
+        return messageQueue.isEmpty();
+    }
+
+    public boolean iterate() {
+
+        // Deliver any messages queued on the consumer to their listeners.
+        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
+            if (consumer.iterate()) {
+                return true;
+            }
+        }
+
+        // No messages left queued on the listeners.. so now dispatch messages
+        // queued on the session
+        MessageDispatch message = messageQueue.dequeueNoWait();
+        if (message == null) {
+            return false;
+        } else {
+            dispatch(message);
+            return !messageQueue.isEmpty();
+        }
+    }
+
+    List getUnconsumedMessages() {
+        return messageQueue.removeAll();
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * A client uses a <CODE>TopicPublisher</CODE> object to publish messages on
+ * a topic. A <CODE>TopicPublisher</CODE> object is the publish-subscribe
+ * form of a message producer.
+ * <p/>
+ * <P>
+ * Normally, the <CODE>Topic</CODE> is specified when a <CODE>TopicPublisher
+ * </CODE> is created. In this case, an attempt to use the <CODE>publish
+ * </CODE> methods for an unidentified <CODE>TopicPublisher</CODE> will throw
+ * a <CODE>java.lang.UnsupportedOperationException</CODE>.
+ * <p/>
+ * <P>
+ * If the <CODE>TopicPublisher</CODE> is created with an unidentified <CODE>
+ * Topic</CODE>, an attempt to use the <CODE>publish</CODE> methods that
+ * assume that the <CODE>Topic</CODE> has been identified will throw a <CODE>
+ * java.lang.UnsupportedOperationException</CODE>.
+ * <p/>
+ * <P>
+ * During the execution of its <CODE>publish</CODE> method, a message must
+ * not be changed by other threads within the client. If the message is
+ * modified, the result of the <CODE>publish</CODE> is undefined.
+ * <p/>
+ * <P>
+ * After publishing a message, a client may retain and modify it without
+ * affecting the message that has been published. The same message object may
+ * be published multiple times.
+ * <p/>
+ * <P>
+ * The following message headers are set as part of publishing a message:
+ * <code>JMSDestination</code>,<code>JMSDeliveryMode</code>,<code>JMSExpiration</code>,
+ * <code>JMSPriority</code>,<code>JMSMessageID</code> and <code>JMSTimeStamp</code>.
+ * When the message is published, the values of these headers are ignored.
+ * After completion of the <CODE>publish</CODE>, the headers hold the values
+ * specified by the method publishing the message. It is possible for the
+ * <CODE>publish</CODE> method not to set <code>JMSMessageID</code> and
+ * <code>JMSTimeStamp</code> if the setting of these headers is explicitly
+ * disabled by the <code>MessageProducer.setDisableMessageID</code> or <code>MessageProducer.setDisableMessageTimestamp</code>
+ * method.
+ * <p/>
+ * <P>
+ * Creating a <CODE>MessageProducer</CODE> provides the same features as
+ * creating a <CODE>TopicPublisher</CODE>. A <CODE>MessageProducer</CODE>
+ * object is recommended when creating new code. The <CODE>TopicPublisher
+ * </CODE> is provided to support existing code.
+ * <p/>
+ * <p/>
+ * <P>
+ * Because <CODE>TopicPublisher</CODE> inherits from <CODE>MessageProducer
+ * </CODE>, it inherits the <CODE>send</CODE> methods that are a part of the
+ * <CODE>MessageProducer</CODE> interface. Using the <CODE>send</CODE>
+ * methods will have the same effect as using the <CODE>publish</CODE>
+ * methods: they are functionally the same.
+ *
+ * @see Session#createProducer(Destination)
+ * @see TopicSession#createPublisher(Topic)
+ */
+
+public class ActiveMQTopicPublisher extends ActiveMQMessageProducer implements
+        TopicPublisher {
+
+    protected ActiveMQTopicPublisher(ActiveMQSession session,
+                                     ActiveMQDestination destination, int sendTimeout) throws JMSException {
+        super(session, session.getNextProducerId(), destination,sendTimeout);
+    }
+
+    /**
+     * Gets the topic associated with this <CODE>TopicPublisher</CODE>.
+     *
+     * @return this publisher's topic
+     * @throws JMSException if the JMS provider fails to get the topic for this
+     *                      <CODE>TopicPublisher</CODE> due to some internal error.
+     */
+
+    public Topic getTopic() throws JMSException {
+        return (Topic) super.getDestination();
+    }
+
+    /**
+     * Publishes a message to the topic. Uses the <CODE>TopicPublisher</CODE>'s
+     * default delivery mode, priority, and time to live.
+     *
+     * @param message the message to publish
+     * @throws JMSException                if the JMS provider fails to publish the message due to
+     *                                     some internal error.
+     * @throws MessageFormatException      if an invalid message is specified.
+     * @throws InvalidDestinationException if a client uses this method with a <CODE>TopicPublisher
+     *                                     </CODE> with an invalid topic.
+     * @throws java.lang.UnsupportedOperationException
+     *                                     if a client uses this method with a <CODE>TopicPublisher
+     *                                     </CODE> that did not specify a topic at creation time.
+     * @see javax.jms.MessageProducer#getDeliveryMode()
+     * @see javax.jms.MessageProducer#getTimeToLive()
+     * @see javax.jms.MessageProducer#getPriority()
+     */
+
+    public void publish(Message message) throws JMSException {
+        super.send(message);
+    }
+
+    /**
+     * Publishes a message to the topic, specifying delivery mode, priority,
+     * and time to live.
+     *
+     * @param message      the message to publish
+     * @param deliveryMode the delivery mode to use
+     * @param priority     the priority for this message
+     * @param timeToLive   the message's lifetime (in milliseconds)
+     * @throws JMSException                if the JMS provider fails to publish the message due to
+     *                                     some internal error.
+     * @throws MessageFormatException      if an invalid message is specified.
+     * @throws InvalidDestinationException if a client uses this method with a <CODE>TopicPublisher
+     *                                     </CODE> with an invalid topic.
+     * @throws java.lang.UnsupportedOperationException
+     *                                     if a client uses this method with a <CODE>TopicPublisher
+     *                                     </CODE> that did not specify a topic at creation time.
+     */
+
+    public void publish(Message message, int deliveryMode, int priority,
+                        long timeToLive) throws JMSException {
+        super.send(message, deliveryMode, priority, timeToLive);
+    }
+
+    /**
+     * Publishes a message to a topic for an unidentified message producer.
+     * Uses the <CODE>TopicPublisher</CODE>'s default delivery mode,
+     * priority, and time to live.
+     * <p/>
+     * <P>
+     * Typically, a message producer is assigned a topic at creation time;
+     * however, the JMS API also supports unidentified message producers, which
+     * require that the topic be supplied every time a message is published.
+     *
+     * @param topic   the topic to publish this message to
+     * @param message the message to publish
+     * @throws JMSException                if the JMS provider fails to publish the message due to
+     *                                     some internal error.
+     * @throws MessageFormatException      if an invalid message is specified.
+     * @throws InvalidDestinationException if a client uses this method with an invalid topic.
+     * @see javax.jms.MessageProducer#getDeliveryMode()
+     * @see javax.jms.MessageProducer#getTimeToLive()
+     * @see javax.jms.MessageProducer#getPriority()
+     */
+
+    public void publish(Topic topic, Message message) throws JMSException {
+        super.send(topic, message);
+    }
+
+    /**
+     * Publishes a message to a topic for an unidentified message producer,
+     * specifying delivery mode, priority and time to live.
+     * <p/>
+     * <P>
+     * Typically, a message producer is assigned a topic at creation time;
+     * however, the JMS API also supports unidentified message producers, which
+     * require that the topic be supplied every time a message is published.
+     *
+     * @param topic        the topic to publish this message to
+     * @param message      the message to publish
+     * @param deliveryMode the delivery mode to use
+     * @param priority     the priority for this message
+     * @param timeToLive   the message's lifetime (in milliseconds)
+     * @throws JMSException                if the JMS provider fails to publish the message due to
+     *                                     some internal error.
+     * @throws MessageFormatException      if an invalid message is specified.
+     * @throws InvalidDestinationException if a client uses this method with an invalid topic.
+     */
+
+    public void publish(Topic topic, Message message, int deliveryMode,
+                        int priority, long timeToLive) throws JMSException {
+        super.send(topic, message, deliveryMode, priority, timeToLive);
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSession.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSession.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSession.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSession.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.Serializable;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+/**
+ * A TopicSession implementation that throws IllegalStateExceptions when Queue
+ * operations are attempted but which delegates to another TopicSession for all
+ * other operations. The ActiveMQSessions implement both Topic and Queue
+ * Sessions methods but the spec states that TopicSession should throw
+ * Exceptions if queue operations are attempted on it.
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class ActiveMQTopicSession implements TopicSession {
+
+    private final TopicSession next;
+
+    public ActiveMQTopicSession(TopicSession next) {
+        this.next = next;
+    }
+
+    /**
+     * @throws JMSException
+     */
+    public void close() throws JMSException {
+        next.close();
+    }
+
+    /**
+     * @throws JMSException
+     */
+    public void commit() throws JMSException {
+        next.commit();
+    }
+
+    /**
+     * @param queue
+     * @return
+     * @throws JMSException
+     */
+    public QueueBrowser createBrowser(Queue queue) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a TopicSession");
+    }
+
+    /**
+     * @param queue
+     * @param messageSelector
+     * @return
+     * @throws JMSException
+     */
+    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a TopicSession");
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public BytesMessage createBytesMessage() throws JMSException {
+        return next.createBytesMessage();
+    }
+
+    /**
+     * @param destination
+     * @return
+     * @throws JMSException
+     */
+    public MessageConsumer createConsumer(Destination destination) throws JMSException {
+        if (destination instanceof Queue) {
+            throw new InvalidDestinationException("Queues are not supported by a TopicSession");
+        }
+        return next.createConsumer(destination);
+    }
+
+    /**
+     * @param destination
+     * @param messageSelector
+     * @return
+     * @throws JMSException
+     */
+    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
+        if (destination instanceof Queue) {
+            throw new InvalidDestinationException("Queues are not supported by a TopicSession");
+        }
+        return next.createConsumer(destination, messageSelector);
+    }
+
+    /**
+     * @param destination
+     * @param messageSelector
+     * @param noLocal
+     * @return
+     * @throws JMSException
+     */
+    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
+        if (destination instanceof Queue) {
+            throw new InvalidDestinationException("Queues are not supported by a TopicSession");
+        }
+        return next.createConsumer(destination, messageSelector, noLocal);
+    }
+
+    /**
+     * @param topic
+     * @param name
+     * @return
+     * @throws JMSException
+     */
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
+        return next.createDurableSubscriber(topic, name);
+    }
+
+    /**
+     * @param topic
+     * @param name
+     * @param messageSelector
+     * @param noLocal
+     * @return
+     * @throws JMSException
+     */
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
+        return next.createDurableSubscriber(topic, name, messageSelector, noLocal);
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public MapMessage createMapMessage() throws JMSException {
+        return next.createMapMessage();
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public Message createMessage() throws JMSException {
+        return next.createMessage();
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public ObjectMessage createObjectMessage() throws JMSException {
+        return next.createObjectMessage();
+    }
+
+    /**
+     * @param object
+     * @return
+     * @throws JMSException
+     */
+    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
+        return next.createObjectMessage(object);
+    }
+
+    /**
+     * @param destination
+     * @return
+     * @throws JMSException
+     */
+    public MessageProducer createProducer(Destination destination) throws JMSException {
+        if (destination instanceof Queue) {
+            throw new InvalidDestinationException("Queues are not supported by a TopicSession");
+        }
+        return next.createProducer(destination);
+    }
+
+    /**
+     * @param topic
+     * @return
+     * @throws JMSException
+     */
+    public TopicPublisher createPublisher(Topic topic) throws JMSException {
+        return next.createPublisher(topic);
+    }
+
+    /**
+     * @param queueName
+     * @return
+     * @throws JMSException
+     */
+    public Queue createQueue(String queueName) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a TopicSession");
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public StreamMessage createStreamMessage() throws JMSException {
+        return next.createStreamMessage();
+    }
+
+    /**
+     * @param topic
+     * @return
+     * @throws JMSException
+     */
+    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
+        return next.createSubscriber(topic);
+    }
+
+    /**
+     * @param topic
+     * @param messageSelector
+     * @param noLocal
+     * @return
+     * @throws JMSException
+     */
+    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
+        return next.createSubscriber(topic, messageSelector, noLocal);
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public TemporaryQueue createTemporaryQueue() throws JMSException {
+        throw new IllegalStateException("Operation not supported by a TopicSession");
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public TemporaryTopic createTemporaryTopic() throws JMSException {
+        return next.createTemporaryTopic();
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public TextMessage createTextMessage() throws JMSException {
+        return next.createTextMessage();
+    }
+
+    /**
+     * @param text
+     * @return
+     * @throws JMSException
+     */
+    public TextMessage createTextMessage(String text) throws JMSException {
+        return next.createTextMessage(text);
+    }
+
+    /**
+     * @param topicName
+     * @return
+     * @throws JMSException
+     */
+    public Topic createTopic(String topicName) throws JMSException {
+        return next.createTopic(topicName);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    public boolean equals(Object arg0) {
+        return next.equals(arg0);
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public int getAcknowledgeMode() throws JMSException {
+        return next.getAcknowledgeMode();
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public MessageListener getMessageListener() throws JMSException {
+        return next.getMessageListener();
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public boolean getTransacted() throws JMSException {
+        return next.getTransacted();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Object#hashCode()
+     */
+    public int hashCode() {
+        return next.hashCode();
+    }
+
+    /**
+     * @throws JMSException
+     */
+    public void recover() throws JMSException {
+        next.recover();
+    }
+
+    /**
+     * @throws JMSException
+     */
+    public void rollback() throws JMSException {
+        next.rollback();
+    }
+
+    /**
+     * 
+     */
+    public void run() {
+        next.run();
+    }
+
+    /**
+     * @param listener
+     * @throws JMSException
+     */
+    public void setMessageListener(MessageListener listener) throws JMSException {
+        next.setMessageListener(listener);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Object#toString()
+     */
+    public String toString() {
+        return next.toString();
+    }
+
+    /**
+     * @param name
+     * @throws JMSException
+     */
+    public void unsubscribe(String name) throws JMSException {
+        next.unsubscribe(name);
+    }
+
+    public TopicSession getNext() {
+        return next;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSession.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+
+/**
+ * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
+ * that have been published to a topic. A <CODE>TopicSubscriber</CODE> object
+ * is the publish/subscribe form of a message consumer. A <CODE>
+ * MessageConsumer</CODE> can be created by using <CODE>
+ * Session.createConsumer</CODE>.
+ * <p/>
+ * <P>
+ * A <CODE>TopicSession</CODE> allows the creation of multiple <CODE>
+ * TopicSubscriber</CODE> objects per topic. It will deliver each message for
+ * a topic to each subscriber eligible to receive it. Each copy of the message
+ * is treated as a completely separate message. Work done on one copy has no
+ * effect on the others; acknowledging one does not acknowledge the others; one
+ * message may be delivered immediately, while another waits for its subscriber
+ * to process messages ahead of it.
+ * <p/>
+ * <P>
+ * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive
+ * only messages that are published while they are active.
+ * <p/>
+ * <P>
+ * Messages filtered out by a subscriber's message selector will never be
+ * delivered to the subscriber. From the subscriber's perspective, they do not
+ * exist.
+ * <p/>
+ * <P>
+ * In some cases, a connection may both publish and subscribe to a topic. The
+ * subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to inhibit
+ * the delivery of messages published by its own connection.
+ * <p/>
+ * <P>
+ * If a client needs to receive all the messages published on a topic,
+ * including the ones published while the subscriber is inactive, it uses a
+ * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record
+ * of this durable subscription and insures that all messages from the topic's
+ * publishers are retained until they are acknowledged by this durable
+ * subscriber or they have expired.
+ * <p/>
+ * <P>
+ * Sessions with durable subscribers must always provide the same client
+ * identifier. In addition, each client must specify a name that uniquely
+ * identifies (within client identifier) each durable subscription it creates.
+ * Only one session at a time can have a <CODE>TopicSubscriber</CODE> for a
+ * particular durable subscription.
+ * <p/>
+ * <P>
+ * A client can change an existing durable subscription by creating a durable
+ * <CODE>TopicSubscriber</CODE> with the same name and a new topic and/or
+ * message selector. Changing a durable subscription is equivalent to
+ * unsubscribing (deleting) the old one and creating a new one.
+ * <p/>
+ * <P>
+ * The <CODE>unsubscribe</CODE> method is used to delete a durable
+ * subscription. The <CODE>unsubscribe</CODE> method can be used at the
+ * <CODE>Session</CODE> or <CODE>TopicSession</CODE> level. This method
+ * deletes the state being maintained on behalf of the subscriber by its
+ * provider.
+ * <p/>
+ * <P>
+ * Creating a <CODE>MessageConsumer</CODE> provides the same features as
+ * creating a <CODE>TopicSubscriber</CODE>. To create a durable subscriber,
+ * use of <CODE>Session.CreateDurableSubscriber</CODE> is recommended. The
+ * <CODE>TopicSubscriber</CODE> is provided to support existing code.
+ *
+ * @see javax.jms.Session#createConsumer
+ * @see javax.jms.Session#createDurableSubscriber
+ * @see javax.jms.TopicSession
+ * @see javax.jms.TopicSession#createSubscriber
+ * @see javax.jms.TopicSubscriber
+ * @see javax.jms.MessageConsumer
+ */
+
+public class ActiveMQTopicSubscriber extends ActiveMQMessageConsumer implements
+        TopicSubscriber {
+
+    /**
+     * @param theSession
+     * @param value 
+     * @param dest
+     * @param name
+     * @param selector
+     * @param cnum
+     * @param noLocalValue
+     * @param browserValue
+     * @param asyncDispatch 
+     * @throws JMSException
+     */
+    protected ActiveMQTopicSubscriber(ActiveMQSession theSession,
+                                      ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount,
+                                      boolean noLocalValue, boolean browserValue, boolean asyncDispatch) throws JMSException {
+        super(theSession, consumerId, dest, name, selector, prefetch, maximumPendingMessageCount, noLocalValue, browserValue, asyncDispatch, null);
+    }
+
+    /**
+     * Gets the <CODE>Topic</CODE> associated with this subscriber.
+     *
+     * @return this subscriber's <CODE>Topic</CODE>
+     * @throws JMSException if the JMS provider fails to get the topic for this topic
+     *                      subscriber due to some internal error.
+     */
+
+    public Topic getTopic() throws JMSException {
+        checkClosed();
+        return (Topic) super.getDestination();
+    }
+
+    /**
+     * Gets the <CODE>NoLocal</CODE> attribute for this subscriber. The
+     * default value for this attribute is false.
+     *
+     * @return true if locally published messages are being inhibited
+     * @throws JMSException if the JMS provider fails to get the <CODE>NoLocal
+     *                      </CODE> attribute for this topic subscriber due to some
+     *                      internal error.
+     */
+
+    public boolean getNoLocal() throws JMSException {
+        checkClosed();
+        return super.isNoLocal();
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.XAConnection;
+import javax.jms.XAQueueConnection;
+import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicConnection;
+import javax.jms.XATopicSession;
+
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.IdGenerator;
+
+/**
+ * The XAConnection interface extends the capability of Connection by providing
+ * an XASession (optional).
+ * <p/>
+ * The XAConnection interface is optional. JMS providers are not required to
+ * support this interface. This interface is for use by JMS providers to
+ * support transactional environments. Client programs are strongly encouraged
+ * to use the transactional support  available in their environment, rather
+ * than use these XA  interfaces directly.
+ *
+ * @version $Revision: 1.6 $
+ * @see javax.jms.Connection
+ * @see javax.jms.ConnectionFactory
+ * @see javax.jms.QueueConnection
+ * @see javax.jms.TopicConnection
+ * @see javax.jms.TopicConnectionFactory
+ * @see javax.jms.QueueConnection
+ * @see javax.jms.QueueConnectionFactory
+ */
+public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicConnection, XAQueueConnection, XAConnection {
+
+    protected ActiveMQXAConnection(Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
+        super(transport, clientIdGenerator, factoryStats);
+    }
+
+    public XASession createXASession() throws JMSException {
+        return (XASession) createSession(true, Session.SESSION_TRANSACTED);
+    }
+
+    public XATopicSession createXATopicSession() throws JMSException {
+        return (XATopicSession) createSession(true, Session.SESSION_TRANSACTED);
+    }
+
+    public XAQueueSession createXAQueueSession() throws JMSException {
+        return (XAQueueSession) createSession(true, Session.SESSION_TRANSACTED);
+    }
+
+    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
+        checkClosedOrFailed();
+        ensureConnectionInfoSent();
+        return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, isDispatchAsync());
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.net.URI;
+
+import javax.jms.JMSException;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XAQueueConnection;
+import javax.jms.XAQueueConnectionFactory;
+import javax.jms.XATopicConnection;
+import javax.jms.XATopicConnectionFactory;
+
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.transport.Transport;
+
+/**
+ * A factory of {@link XAConnection} instances
+ * 
+ * @version $Revision: 564057 $
+ */
+public class ActiveMQXAConnectionFactory extends ActiveMQConnectionFactory implements XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory {
+
+    public ActiveMQXAConnectionFactory() {
+    }
+
+    public ActiveMQXAConnectionFactory(String userName, String password, String brokerURL) {
+        super(userName, password, brokerURL);
+    }
+
+    public ActiveMQXAConnectionFactory(String userName, String password, URI brokerURL) {
+        super(userName, password, brokerURL);
+    }
+
+    public ActiveMQXAConnectionFactory(String brokerURL) {
+        super(brokerURL);
+    }
+
+    public ActiveMQXAConnectionFactory(URI brokerURL) {
+        super(brokerURL);
+    }
+
+    public XAConnection createXAConnection() throws JMSException {
+        return (XAConnection) createActiveMQConnection();
+    }
+
+    public XAConnection createXAConnection(String userName, String password) throws JMSException {
+        return (XAConnection) createActiveMQConnection(userName, password);
+    }
+
+    public XAQueueConnection createXAQueueConnection() throws JMSException {
+        return (XAQueueConnection) createActiveMQConnection();
+    }
+
+    public XAQueueConnection createXAQueueConnection(String userName, String password) throws JMSException {
+        return (XAQueueConnection) createActiveMQConnection(userName, password);
+    }
+
+    public XATopicConnection createXATopicConnection() throws JMSException {
+        return (XATopicConnection) createActiveMQConnection();
+    }
+
+    public XATopicConnection createXATopicConnection(String userName, String password) throws JMSException {
+        return (XATopicConnection) createActiveMQConnection(userName, password);
+    }
+
+    protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
+        ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, getClientIdGenerator(), stats);
+        return connection;
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASession.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASession.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASession.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASession.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TopicSession;
+import javax.jms.TransactionInProgressException;
+import javax.jms.XAQueueSession;
+import javax.jms.XATopicSession;
+import javax.transaction.xa.XAResource;
+
+import org.apache.activemq.command.SessionId;
+
+/**
+ * The XASession interface extends the capability of Session by adding access
+ * to a JMS provider's support for the  Java Transaction API (JTA) (optional).
+ * This support takes the form of a javax.transaction.xa.XAResource object.
+ * The functionality of this object closely resembles that defined by the
+ * standard X/Open XA Resource interface.
+ * <p/>
+ * An application server controls the transactional assignment of an XASession
+ * by obtaining its XAResource. It uses the XAResource to assign the session
+ * to a transaction, prepare and commit work on the transaction, and so on.
+ * <p/>
+ * An XAResource provides some fairly sophisticated facilities for
+ * interleaving work on multiple transactions, recovering a list of
+ * transactions in progress, and so on. A JTA aware JMS provider must fully
+ * implement this functionality. This could be done by using the services of a
+ * database that supports XA, or a JMS provider may choose to implement this
+ * functionality from scratch.
+ * <p/>
+ * A client of the application server is given what it thinks is a regular
+ * JMS Session. Behind the scenes, the application server controls the
+ * transaction management of the underlying XASession.
+ * <p/>
+ * The XASession interface is optional. JMS providers are not required to
+ * support this interface. This interface is for use by JMS providers to
+ * support transactional environments. Client programs are strongly encouraged
+ * to use the transactional support  available in their environment, rather
+ * than use these XA  interfaces directly.
+ *
+ * @version $Revision: 1.5 $
+ * @see javax.jms.Session
+ * @see javax.jms.QueueSession
+ * @see javax.jms.TopicSession
+ * @see javax.jms.XASession
+ */
+public class ActiveMQXASession extends ActiveMQSession implements QueueSession, TopicSession, XAQueueSession, XATopicSession {
+
+    public ActiveMQXASession(ActiveMQXAConnection connection, SessionId sessionId, int theAcknowlegeMode, boolean dispatchAsync) throws JMSException {
+        super(connection, sessionId, theAcknowlegeMode, dispatchAsync);
+    }
+
+    public boolean getTransacted() throws JMSException {
+        return true;
+    }
+
+    public void rollback() throws JMSException {
+        throw new TransactionInProgressException("Cannot rollback() inside an XASession");
+    }
+
+    public void commit() throws JMSException {
+        throw new TransactionInProgressException("Cannot commit() inside an XASession");
+    }
+
+    public Session getSession() throws JMSException {
+        return this;
+    }
+
+    public XAResource getXAResource() {
+        return getTransactionContext();
+    }
+
+    public QueueSession getQueueSession() throws JMSException {
+        return new ActiveMQQueueSession(this);
+    }
+
+    public TopicSession getTopicSession() throws JMSException {
+        return new ActiveMQTopicSession(this);
+    }
+
+    /**
+     * This is called before transacted work is done by
+     * the session.  XA Work can only be done when this
+     * XA resource is associated with an Xid.
+     *
+     * @throws JMSException not associated with an Xid
+     */
+    protected void doStartTransaction() throws JMSException {
+
+        if (!getTransactionContext().isInXATransaction()) {
+            throw new JMSException("Session's XAResource has not been enlisted in a distributed transaction.");
+        }
+
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASession.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class AdvisoryConsumer implements ActiveMQDispatcher {
+    private static final transient Log LOG = LogFactory.getLog(AdvisoryConsumer.class);
+
+    int deliveredCounter;
+
+    private final ActiveMQConnection connection;
+    private ConsumerInfo info;
+    private boolean closed;
+
+    public AdvisoryConsumer(ActiveMQConnection connection, ConsumerId consumerId) throws JMSException {
+        this.connection = connection;
+        info = new ConsumerInfo(consumerId);
+        info.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
+        info.setPrefetchSize(1000);
+        info.setNoLocal(true);
+
+        this.connection.addDispatcher(info.getConsumerId(), this);
+        this.connection.syncSendPacket(this.info);
+    }
+
+    public synchronized void dispose() {
+        if (!closed) {
+            try {
+                this.connection.asyncSendPacket(info.createRemoveCommand());
+            } catch (JMSException e) {
+                LOG.info("Failed to send remove command: " + e, e);
+            }
+            this.connection.removeDispatcher(info.getConsumerId());
+            closed = true;
+        }
+    }
+
+    public void dispatch(MessageDispatch md) {
+
+        // Auto ack messages when we reach 75% of the prefetch
+        deliveredCounter++;
+        if (deliveredCounter > (0.75 * info.getPrefetchSize())) {
+            try {
+                MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
+                connection.asyncSendPacket(ack);
+                deliveredCounter = 0;
+            } catch (JMSException e) {
+                connection.onClientInternalException(e);
+            }
+        }
+
+        DataStructure o = md.getMessage().getDataStructure();
+        if (o != null && o.getClass() == DestinationInfo.class) {
+            processDestinationInfo((DestinationInfo)o);
+        } else {
+            //This can happen across networks
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Unexpected message was dispatched to the AdvisoryConsumer: "+md);
+            }
+        }
+
+    }
+
+    private void processDestinationInfo(DestinationInfo dinfo) {
+        ActiveMQDestination dest = dinfo.getDestination();
+        if (!dest.isTemporary()) {
+            return;
+        }
+
+        ActiveMQTempDestination tempDest = (ActiveMQTempDestination)dest;
+        if (dinfo.getOperationType() == DestinationInfo.ADD_OPERATION_TYPE) {
+            connection.activeTempDestinations.put(tempDest, tempDest);
+        } else if (dinfo.getOperationType() == DestinationInfo.REMOVE_OPERATION_TYPE) {
+            connection.activeTempDestinations.remove(tempDest);
+        }
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AlreadyClosedException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AlreadyClosedException.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AlreadyClosedException.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AlreadyClosedException.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+
+/**
+ * An exception which is closed if you try to access a resource which has already
+ * been closed
+ *
+ * @version $Revision: 1.2 $
+ */
+public class AlreadyClosedException extends JMSException {
+
+    private static final long serialVersionUID = -3203104889571618702L;
+
+    public AlreadyClosedException() {
+        super("this connection");
+    }
+
+    public AlreadyClosedException(String description) {
+        super("Cannot use " + description + " as it has already been closed", "AMQ-1001");
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AlreadyClosedException.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/BlobMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/BlobMessage.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/BlobMessage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/BlobMessage.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+/**
+ * Represents a message which has a typically out of band Binary Large Object
+ * (BLOB)
+ * 
+ * @version $Revision: $
+ */
+public interface BlobMessage extends Message {
+
+    /**
+     * Return the input stream to process the BLOB
+     */
+    InputStream getInputStream() throws IOException, JMSException;
+
+    /**
+     * Returns the URL for the blob if its available as an external URL (such as file, http, ftp etc)
+     * or null if there is no URL available
+     */
+    URL getURL() throws MalformedURLException, JMSException;
+
+
+    /**
+     * The MIME type of the BLOB which can be used to apply different content types to messages.
+     */
+    String getMimeType();
+
+    /**
+     * Sets the MIME type of the BLOB so that a consumer can process things nicely with a Java Activation Framework
+     * DataHandler
+     */
+    void setMimeType(String mimeType);
+
+
+    String getName();
+
+    /**
+     * The name of the attachment which can be useful information if transmitting files over ActiveMQ
+     */
+    void setName(String name);
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ClientInternalExceptionListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ClientInternalExceptionListener.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ClientInternalExceptionListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ClientInternalExceptionListener.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+/**
+ * An exception listener similar to the standard <code>javax.jms.ExceptionListener</code>
+ * which can be used by client code to be notified of exceptions thrown by container components 
+ * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
+ * <p>
+ * The <code>org.apache.activemq.ActiveMQConnection</code> that the listener has been registered with does
+ * this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code> describing
+ * the problem.
+ * </p>
+ * 
+ * @author Kai Hudalla
+ * @see ActiveMQConnection#setClientInternalExceptionListener(org.apache.activemq.ClientInternalExceptionListener)
+ */
+public interface ClientInternalExceptionListener
+{
+    /**
+     * Notifies a client of an exception while asynchronously processing a message.
+     * 
+     * @param exception the exception describing the problem
+     */
+    void onException(Throwable exception);
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Closeable.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Closeable.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Closeable.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Closeable.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+
+/**
+ * Provides a uniform interface that can be used to close all the JMS obejcts
+ * that provide a close() method. Useful for when you want to collect a
+ * heterogeous set of JMS object in a collection to be closed at a later time.
+ * 
+ * @version $Revision: 1.2 $
+ */
+public interface Closeable {
+
+    /**
+     * Closes a JMS object.
+     * <P>
+     * Many JMS objects are closeable such as Connections, Sessions, Consumers
+     * and Producers.
+     * 
+     * @throws JMSException if the JMS provider fails to close the object due to
+     *                 some internal error.
+     */
+    void close() throws JMSException;
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Closeable.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConfigurationException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConfigurationException.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConfigurationException.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConfigurationException.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+
+/**
+ * An exception thrown when a service is not correctly configured
+ *
+ * @version $Revision: 1.2 $
+ */
+public class ConfigurationException extends JMSException {
+    private static final long serialVersionUID = 5639082552451065258L;
+
+    public ConfigurationException(String description) {
+        super(description, "AMQ-1002");
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConfigurationException.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionAudit.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionAudit.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionAudit.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionAudit.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.LinkedHashMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.util.LRUCache;
+
+/**
+ * An auditor class for a Connection that looks for duplicates
+ */
+class ConnectionAudit {
+
+    private boolean checkForDuplicates;
+    private LinkedHashMap<ActiveMQDestination, ActiveMQMessageAudit> destinations = new LRUCache<ActiveMQDestination, ActiveMQMessageAudit>(1000);
+    private LinkedHashMap<ActiveMQDispatcher, ActiveMQMessageAudit> dispatchers = new LRUCache<ActiveMQDispatcher, ActiveMQMessageAudit>(1000);
+
+    
+	private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
+	private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
+	
+	
+    synchronized void removeDispatcher(ActiveMQDispatcher dispatcher) {
+        dispatchers.remove(dispatcher);
+    }
+
+    synchronized boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
+        if (checkForDuplicates && message != null) {
+            ActiveMQDestination destination = message.getDestination();
+            if (destination != null) {
+                if (destination.isQueue()) {
+                    ActiveMQMessageAudit audit = destinations.get(destination);
+                    if (audit == null) {
+                        audit = new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber);
+                        destinations.put(destination, audit);
+                    }
+                    boolean result = audit.isDuplicate(message);
+                    return result;
+                }
+                ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
+                if (audit == null) {
+                    audit = new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber);
+                    dispatchers.put(dispatcher, audit);
+                }
+                boolean result = audit.isDuplicate(message);
+                return result;
+            }
+        }
+        return false;
+    }
+
+    protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
+        if (checkForDuplicates && message != null) {
+            ActiveMQDestination destination = message.getDestination();
+            if (destination != null) {
+                if (destination.isQueue()) {
+                    ActiveMQMessageAudit audit = destinations.get(destination);
+                    if (audit != null) {
+                        audit.rollback(message);
+                    }
+                } else {
+                    ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
+                    if (audit != null) {
+                        audit.rollback(message);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @return the checkForDuplicates
+     */
+    boolean isCheckForDuplicates() {
+        return this.checkForDuplicates;
+    }
+
+    /**
+     * @param checkForDuplicates the checkForDuplicates to set
+     */
+    void setCheckForDuplicates(boolean checkForDuplicates) {
+        this.checkForDuplicates = checkForDuplicates;
+    }
+
+	public int getAuditDepth() {
+		return auditDepth;
+	}
+
+	public void setAuditDepth(int auditDepth) {
+		this.auditDepth = auditDepth;
+	}
+
+	public int getAuditMaximumProducerNumber() {
+		return auditMaximumProducerNumber;
+	}
+
+	public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
+		this.auditMaximumProducerNumber = auditMaximumProducerNumber;
+	}
+    
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionClosedException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionClosedException.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionClosedException.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionClosedException.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.IllegalStateException;
+
+/**
+ * An exception thrown when attempt is made to use a connection when the connection has been closed.
+ *
+ * @version $Revision: 1.2 $
+ */
+public class ConnectionClosedException extends IllegalStateException {
+    private static final long serialVersionUID = -7681404582227153308L;
+
+    public ConnectionClosedException() {
+        super("The connection is already closed", "AlreadyClosed");
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionClosedException.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionFailedException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionFailedException.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionFailedException.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionFailedException.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+/**
+ * An exception thrown when the a connection failure is detected (peer might
+ * close the connection, or a keep alive times out, etc.)
+ * 
+ * @version $Revision$
+ */
+public class ConnectionFailedException extends JMSException {
+
+    private static final long serialVersionUID = 2288453203492073973L;
+
+    public ConnectionFailedException(IOException cause) {
+        super("The JMS connection has failed: " + extractMessage(cause));
+        initCause(cause);
+        setLinkedException(cause);
+    }
+
+    public ConnectionFailedException() {
+        super("The JMS connection has failed due to a Transport problem");
+    }
+
+    private static String extractMessage(IOException cause) {
+        String m = cause.getMessage();
+        if (m == null || m.length() == 0) {
+            m = cause.toString();
+        }
+        return m;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionFailedException.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/CustomDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/CustomDestination.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/CustomDestination.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/CustomDestination.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSubscriber;
+
+/**
+ * Represents a hook to allow the support of custom destinations
+ * such as to support <a href="http://activemq.apache.org/camel/">Apache Camel</a>
+ * to create and manage endpoints
+ *
+ * @version $Revision: $
+ */
+public interface CustomDestination extends Destination {
+
+    // Consumers
+    //-----------------------------------------------------------------------
+    MessageConsumer createConsumer(ActiveMQSession session, String messageSelector);
+    MessageConsumer createConsumer(ActiveMQSession session, String messageSelector, boolean noLocal);
+
+    TopicSubscriber createSubscriber(ActiveMQSession session, String messageSelector, boolean noLocal);
+    TopicSubscriber createDurableSubscriber(ActiveMQSession session, String name, String messageSelector, boolean noLocal);
+
+    QueueReceiver createReceiver(ActiveMQSession session, String messageSelector);
+
+    // Producers
+    //-----------------------------------------------------------------------
+    MessageProducer createProducer(ActiveMQSession session) throws JMSException;
+
+    TopicPublisher createPublisher(ActiveMQSession session) throws JMSException;
+
+    QueueSender createSender(ActiveMQSession session) throws JMSException;
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Disposable.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Disposable.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Disposable.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Disposable.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+
+/**
+ * @version $Revision$
+ */
+public interface Disposable {
+    
+    /**
+     */
+    void dispose();
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/EnhancedConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/EnhancedConnection.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/EnhancedConnection.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/EnhancedConnection.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.TopicConnection;
+import javax.jms.QueueConnection;
+import javax.jms.JMSException;
+
+import org.apache.activemq.advisory.DestinationSource;
+
+/**
+ * A set of enhanced APIs for a JMS provider
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface EnhancedConnection extends TopicConnection, QueueConnection, Closeable {
+    
+    /**
+     * Returns the {@link DestinationSource} object which can be used to listen to destinations
+     * being created or destroyed or to enquire about the current destinations available on the broker
+     *
+     * @return a lazily created destination source
+     * @throws JMSException
+     */
+    DestinationSource getDestinationSource() throws JMSException;
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/LocalTransactionEventListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/LocalTransactionEventListener.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/LocalTransactionEventListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/LocalTransactionEventListener.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public interface LocalTransactionEventListener {
+    void beginEvent();
+
+    void commitEvent();
+
+    void rollbackEvent();
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/LocalTransactionEventListener.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Message.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Message.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Message.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Message.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+/**
+ * Represents the JMS extension methods in Apache ActiveMQ
+ *
+ * @version $Revision: $
+ */
+public interface Message extends javax.jms.Message {
+
+    /**
+     * Returns the MIME type of this mesage. This can be used in selectors to filter on
+     * the MIME types of the different JMS messages, or in the case of {@link org.apache.activemq.BlobMessage}
+     * it allows you to create a selector on the MIME type of the BLOB body
+     */
+    String getJMSXMimeType();
+
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableConsumer.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableConsumer.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableConsumer.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.MessageConsumer;
+
+/**
+ * An extended JMS interface that adds the ability to be notified when a 
+ * message is available for consumption using the receive*() methods
+ * which is useful in Ajax style subscription models.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public interface MessageAvailableConsumer extends MessageConsumer {
+
+    /**
+     * Sets the listener used to notify synchronous consumers that there is a message
+     * available so that the {@link MessageConsumer#receiveNoWait()} can be called.
+     */
+    void setAvailableListener(MessageAvailableListener availableListener);
+
+    /**
+     * Gets the listener used to notify synchronous consumers that there is a message
+     * available so that the {@link MessageConsumer#receiveNoWait()} can be called.
+     */
+    MessageAvailableListener getAvailableListener();
+}