You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/02 23:29:35 UTC

svn commit: r781177 [6/11] - in /activemq/sandbox/activemq-flow: activemq-bio/ activemq-bio/src/main/java/org/ activemq-bio/src/main/java/org/apache/ activemq-bio/src/main/java/org/apache/activemq/ activemq-bio/src/main/java/org/apache/activemq/transpo...

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.util.IOExceptionSupport;
+
+/**
+ * @version $Revision$
+ */
+public class ActiveMQOutputStream extends OutputStream implements Disposable {
+
+    // Send down 64k messages.
+    protected int count;
+
+    final byte buffer[] = new byte[64 * 1024];
+
+    private final ActiveMQConnection connection;
+    private final Map<String, Object> properties;
+    private final ProducerInfo info;
+
+    private long messageSequence;
+    private boolean closed;
+    private final int deliveryMode;
+    private final int priority;
+    private final long timeToLive;
+
+    public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority,
+                                long timeToLive) throws JMSException {
+        this.connection = connection;
+        this.deliveryMode = deliveryMode;
+        this.priority = priority;
+        this.timeToLive = timeToLive;
+        this.properties = properties == null ? null : new HashMap<String, Object>(properties);
+
+        if (destination == null) {
+            throw new InvalidDestinationException("Don't understand null destinations");
+        }
+
+        this.info = new ProducerInfo(producerId);
+        this.info.setDestination(destination);
+
+        this.connection.addOutputStream(this);
+        this.connection.asyncSendPacket(info);
+    }
+
+    public void close() throws IOException {
+        if (!closed) {
+            flushBuffer();
+            try {
+                // Send an EOS style empty message to signal EOS.
+                send(new ActiveMQMessage(), true);
+                dispose();
+                this.connection.asyncSendPacket(info.createRemoveCommand());
+            } catch (JMSException e) {
+                IOExceptionSupport.create(e);
+            }
+        }
+    }
+
+    public void dispose() {
+        if (!closed) {
+            this.connection.removeOutputStream(this);
+            closed = true;
+        }
+    }
+
+    public synchronized void write(int b) throws IOException {
+        buffer[count++] = (byte) b;
+        if (count == buffer.length) {
+            flushBuffer();
+        }
+    }
+
+    public synchronized void write(byte b[], int off, int len) throws IOException {
+        while (len > 0) {
+            int max = Math.min(len, buffer.length - count);
+            System.arraycopy(b, off, buffer, count, max);
+
+            len -= max;
+            count += max;
+            off += max;
+
+            if (count == buffer.length) {
+                flushBuffer();
+            }
+        }
+    }
+
+    public synchronized void flush() throws IOException {
+        flushBuffer();
+    }
+
+    private void flushBuffer() throws IOException {
+        if (count != 0) {
+            try {
+                ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
+                msg.writeBytes(buffer, 0, count);
+                send(msg, false);
+            } catch (JMSException e) {
+                throw IOExceptionSupport.create(e);
+            }
+            count = 0;
+        }
+    }
+
+    /**
+     * @param msg
+     * @throws JMSException
+     */
+    private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
+        if (properties != null) {
+            for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) {
+                String key = (String) iter.next();
+                Object value = properties.get(key);
+                msg.setObjectProperty(key, value);
+            }
+        }
+        msg.setType("org.apache.activemq.Stream");
+        msg.setGroupID(info.getProducerId().toString());
+        if (eosMessage) {
+            msg.setGroupSequence(-1);
+        } else {
+            msg.setGroupSequence((int) messageSequence);
+        }
+        MessageId id = new MessageId(info.getProducerId(), messageSequence++);
+        connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage);
+    }
+
+    public String toString() {
+        return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.Serializable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Defines the prefetch message policies for different types of consumers
+ * 
+ * @org.apache.xbean.XBean element="prefetchPolicy"
+ * @version $Revision: 1.3 $
+ */
+public class ActiveMQPrefetchPolicy implements Serializable {
+    private static final Log LOG = LogFactory.getLog(ActiveMQPrefetchPolicy.class);
+    private static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE - 1;
+    private int queuePrefetch;
+    private int queueBrowserPrefetch;
+    private int topicPrefetch;
+    private int durableTopicPrefetch;
+    private int optimizeDurableTopicPrefetch;
+    private int inputStreamPrefetch;
+    private int maximumPendingMessageLimit;
+
+    /**
+     * Initialize default prefetch policies
+     */
+    public ActiveMQPrefetchPolicy() {
+        this.queuePrefetch = 1000;
+        this.queueBrowserPrefetch = 500;
+        this.topicPrefetch = MAX_PREFETCH_SIZE;
+        this.durableTopicPrefetch = 100;
+        this.optimizeDurableTopicPrefetch = 1000;
+        this.inputStreamPrefetch = 100;
+    }
+
+    /**
+     * @return Returns the durableTopicPrefetch.
+     */
+    public int getDurableTopicPrefetch() {
+        return durableTopicPrefetch;
+    }
+
+    /**
+     * @param durableTopicPrefetch The durableTopicPrefetch to set.
+     */
+    public void setDurableTopicPrefetch(int durableTopicPrefetch) {
+        this.durableTopicPrefetch = getMaxPrefetchLimit(durableTopicPrefetch);
+    }
+
+    /**
+     * @return Returns the queuePrefetch.
+     */
+    public int getQueuePrefetch() {
+        return queuePrefetch;
+    }
+
+    /**
+     * @param queuePrefetch The queuePrefetch to set.
+     */
+    public void setQueuePrefetch(int queuePrefetch) {
+        this.queuePrefetch = getMaxPrefetchLimit(queuePrefetch);
+    }
+
+    /**
+     * @return Returns the queueBrowserPrefetch.
+     */
+    public int getQueueBrowserPrefetch() {
+        return queueBrowserPrefetch;
+    }
+
+    /**
+     * @param queueBrowserPrefetch The queueBrowserPrefetch to set.
+     */
+    public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
+        this.queueBrowserPrefetch = getMaxPrefetchLimit(queueBrowserPrefetch);
+    }
+
+    /**
+     * @return Returns the topicPrefetch.
+     */
+    public int getTopicPrefetch() {
+        return topicPrefetch;
+    }
+
+    /**
+     * @param topicPrefetch The topicPrefetch to set.
+     */
+    public void setTopicPrefetch(int topicPrefetch) {
+        this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch);
+    }
+
+    /**
+     * @return Returns the optimizeDurableTopicPrefetch.
+     */
+    public int getOptimizeDurableTopicPrefetch() {
+        return optimizeDurableTopicPrefetch;
+    }
+
+    /**
+     * @param optimizeAcknowledgePrefetch The optimizeDurableTopicPrefetch to
+     *                set.
+     */
+    public void setOptimizeDurableTopicPrefetch(int optimizeAcknowledgePrefetch) {
+        this.optimizeDurableTopicPrefetch = optimizeAcknowledgePrefetch;
+    }
+
+    public int getMaximumPendingMessageLimit() {
+        return maximumPendingMessageLimit;
+    }
+
+    /**
+     * Sets how many messages a broker will keep around, above the prefetch
+     * limit, for non-durable topics before starting to discard older messages.
+     */
+    public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
+        this.maximumPendingMessageLimit = maximumPendingMessageLimit;
+    }
+
+    private int getMaxPrefetchLimit(int value) {
+        int result = Math.min(value, MAX_PREFETCH_SIZE);
+        if (result < value) {
+            LOG.warn("maximum prefetch limit has been reset from " + value + " to " + MAX_PREFETCH_SIZE);
+        }
+        return result;
+    }
+
+    public void setAll(int i) {
+        this.durableTopicPrefetch = i;
+        this.queueBrowserPrefetch = i;
+        this.queuePrefetch = i;
+        this.topicPrefetch = i;
+        this.inputStreamPrefetch = 1;
+        this.optimizeDurableTopicPrefetch = i;
+    }
+
+    public int getInputStreamPrefetch() {
+        return inputStreamPrefetch;
+    }
+
+    public void setInputStreamPrefetch(int inputStreamPrefetch) {
+        this.inputStreamPrefetch = getMaxPrefetchLimit(inputStreamPrefetch);
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.Enumeration;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageDispatch;
+
+/**
+ * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a
+ * queue without removing them. <p/>
+ * <P>
+ * The <CODE>getEnumeration</CODE> method returns a <CODE>
+ * java.util.Enumeration</CODE>
+ * that is used to scan the queue's messages. It may be an enumeration of the
+ * entire content of a queue, or it may contain only the messages matching a
+ * message selector. <p/>
+ * <P>
+ * Messages may be arriving and expiring while the scan is done. The JMS API
+ * does not require the content of an enumeration to be a static snapshot of
+ * queue content. Whether these changes are visible or not depends on the JMS
+ * provider. <p/>
+ * <P>
+ * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
+ * </CODE>
+ * or a <CODE>QueueSession</CODE>.
+ * 
+ * @see javax.jms.Session#createBrowser
+ * @see javax.jms.QueueSession#createBrowser
+ * @see javax.jms.QueueBrowser
+ * @see javax.jms.QueueReceiver
+ */
+
+public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
+
+    private final ActiveMQSession session;
+    private final ActiveMQDestination destination;
+    private final String selector;
+
+    private ActiveMQMessageConsumer consumer;
+    private boolean closed;
+    private final ConsumerId consumerId;
+    private final AtomicBoolean browseDone = new AtomicBoolean(true);
+    private final boolean dispatchAsync;
+    private Object semaphore = new Object();
+
+    /**
+     * Constructor for an ActiveMQQueueBrowser - used internally
+     * 
+     * @param theSession
+     * @param dest
+     * @param selector
+     * @throws JMSException
+     */
+    protected ActiveMQQueueBrowser(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination destination, String selector, boolean dispatchAsync) throws JMSException {
+        this.session = session;
+        this.consumerId = consumerId;
+        this.destination = destination;
+        this.selector = selector;
+        this.dispatchAsync = dispatchAsync;
+        this.consumer = createConsumer();
+    }
+
+    /**
+     * @param session
+     * @param originalDestination
+     * @param selectorExpression
+     * @param cnum
+     * @return
+     * @throws JMSException
+     */
+    private ActiveMQMessageConsumer createConsumer() throws JMSException {
+        browseDone.set(false);
+        ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
+        return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy
+            .getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) {
+            public void dispatch(MessageDispatch md) {
+                if (md.getMessage() == null) {
+                    browseDone.set(true);
+                } else {
+                    super.dispatch(md);
+                }
+                notifyMessageAvailable();
+            }
+        };
+    }
+
+    private void destroyConsumer() {
+        if (consumer == null) {
+            return;
+        }
+        try {
+            consumer.close();
+            consumer = null;
+        } catch (JMSException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Gets an enumeration for browsing the current queue messages in the order
+     * they would be received.
+     * 
+     * @return an enumeration for browsing the messages
+     * @throws JMSException if the JMS provider fails to get the enumeration for
+     *                 this browser due to some internal error.
+     */
+
+    public Enumeration getEnumeration() throws JMSException {
+        checkClosed();
+        if (consumer == null) {
+            consumer = createConsumer();
+        }
+        return this;
+    }
+
+    private void checkClosed() throws IllegalStateException {
+        if (closed) {
+            throw new IllegalStateException("The Consumer is closed");
+        }
+    }
+
+    /**
+     * @return true if more messages to process
+     */
+    public boolean hasMoreElements() {
+        while (true) {
+
+            synchronized (this) {
+                if (consumer == null) {
+                    return false;
+                }
+            }
+
+            if (consumer.getMessageSize() > 0) {
+                return true;
+            }
+
+            if (browseDone.get() || !session.isRunning()) {
+                destroyConsumer();
+                return false;
+            }
+
+            waitForMessage();
+        }
+    }
+
+    /**
+     * @return the next message
+     */
+    public Object nextElement() {
+        while (true) {
+
+            synchronized (this) {
+                if (consumer == null) {
+                    return null;
+                }
+            }
+
+            try {
+                Message answer = consumer.receiveNoWait();
+                if (answer != null) {
+                    return answer;
+                }
+            } catch (JMSException e) {
+                this.session.connection.onClientInternalException(e);
+                return null;
+            }
+
+            if (browseDone.get() || !session.isRunning()) {
+                destroyConsumer();
+                return null;
+            }
+
+            waitForMessage();
+        }
+    }
+
+    public synchronized void close() throws JMSException {
+        destroyConsumer();
+        closed = true;
+    }
+
+    /**
+     * Gets the queue associated with this queue browser.
+     * 
+     * @return the queue
+     * @throws JMSException if the JMS provider fails to get the queue
+     *                 associated with this browser due to some internal error.
+     */
+
+    public Queue getQueue() throws JMSException {
+        return (Queue)destination;
+    }
+
+    public String getMessageSelector() throws JMSException {
+        return selector;
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+
+    /**
+     * Wait on a semaphore for a fixed amount of time for a message to come in.
+     */
+    protected void waitForMessage() {
+        try {
+            synchronized (semaphore) {
+                semaphore.wait(2000);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    protected void notifyMessageAvailable() {
+        synchronized (semaphore) {
+            semaphore.notifyAll();
+        }
+    }
+
+    public String toString() {
+        return "ActiveMQQueueBrowser { value=" + consumerId + " }";
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+
+/**
+ * A client uses a <CODE>QueueReceiver</CODE> object to receive messages that
+ * have been delivered to a queue. <p/>
+ * <P>
+ * Although it is possible to have multiple <CODE>QueueReceiver</CODE> s for
+ * the same queue, the JMS API does not define how messages are distributed
+ * between the <CODE>QueueReceiver</CODE>s. <p/>
+ * <P>
+ * If a <CODE>QueueReceiver</CODE> specifies a message selector, the messages
+ * that are not selected remain on the queue. By definition, a message selector
+ * allows a <CODE>QueueReceiver</CODE> to skip messages. This means that when
+ * the skipped messages are eventually read, the total ordering of the reads
+ * does not retain the partial order defined by each message producer. Only
+ * <CODE>QueueReceiver</CODE> s without a message selector will read messages
+ * in message producer order. <p/>
+ * <P>
+ * Creating a <CODE>MessageConsumer</CODE> provides the same features as
+ * creating a <CODE>QueueReceiver</CODE>. A <CODE>MessageConsumer</CODE>
+ * object is recommended for creating new code. The <CODE>QueueReceiver
+ * </CODE>
+ * is provided to support existing code.
+ * 
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination, String)
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination)
+ * @see javax.jms.QueueSession#createReceiver(Queue, String)
+ * @see javax.jms.QueueSession#createReceiver(Queue)
+ * @see javax.jms.MessageConsumer
+ */
+
+public class ActiveMQQueueReceiver extends ActiveMQMessageConsumer implements QueueReceiver {
+
+    /**
+     * @param theSession
+     * @param value
+     * @param destination
+     * @param messageSelector
+     * @param prefetch
+     * @param asyncDispatch
+     * @throws JMSException
+     */
+    protected ActiveMQQueueReceiver(ActiveMQSession theSession, ConsumerId consumerId,
+                                    ActiveMQDestination destination, String selector, int prefetch,
+                                    int maximumPendingMessageCount, boolean asyncDispatch)
+        throws JMSException {
+        super(theSession, consumerId, destination, null, selector, prefetch, maximumPendingMessageCount,
+              false, false, asyncDispatch, null);
+    }
+
+    /**
+     * Gets the <CODE>Queue</CODE> associated with this queue receiver.
+     * 
+     * @return this receiver's <CODE>Queue</CODE>
+     * @throws JMSException if the JMS provider fails to get the queue for this
+     *                 queue receiver due to some internal error.
+     */
+
+    public Queue getQueue() throws JMSException {
+        checkClosed();
+        return (Queue)super.getDestination();
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSender.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSender.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSender.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * A client uses a <CODE>QueueSender</CODE> object to send messages to a
+ * queue. <p/>
+ * <P>
+ * Normally, the <CODE>Queue</CODE> is specified when a <CODE>QueueSender
+ * </CODE>
+ * is created. In this case, an attempt to use the <CODE>send</CODE> methods
+ * for an unidentified <CODE>QueueSender</CODE> will throw a <CODE>
+ * java.lang.UnsupportedOperationException</CODE>.
+ * <p/>
+ * <P>
+ * If the <CODE>QueueSender</CODE> is created with an unidentified <CODE>
+ * Queue</CODE>,
+ * an attempt to use the <CODE>send</CODE> methods that assume that the
+ * <CODE>Queue</CODE> has been identified will throw a <CODE>
+ * java.lang.UnsupportedOperationException</CODE>.
+ * <p/>
+ * <P>
+ * During the execution of its <CODE>send</CODE> method, a message must not be
+ * changed by other threads within the client. If the message is modified, the
+ * result of the <CODE>send</CODE> is undefined. <p/>
+ * <P>
+ * After sending a message, a client may retain and modify it without affecting
+ * the message that has been sent. The same message object may be sent multiple
+ * times. <p/>
+ * <P>
+ * The following message headers are set as part of sending a message:
+ * <code>JMSDestination</code>, <code>JMSDeliveryMode</code>,<code>JMSExpiration</code>,<code>JMSPriority</code>,
+ * <code>JMSMessageID</code> and <code>JMSTimeStamp</code>. When the
+ * message is sent, the values of these headers are ignored. After the
+ * completion of the <CODE>send</CODE>, the headers hold the values specified
+ * by the method sending the message. It is possible for the <code>send</code>
+ * method not to set <code>JMSMessageID</code> and <code>JMSTimeStamp</code>
+ * if the setting of these headers is explicitly disabled by the
+ * <code>MessageProducer.setDisableMessageID</code> or
+ * <code>MessageProducer.setDisableMessageTimestamp</code> method. <p/>
+ * <P>
+ * Creating a <CODE>MessageProducer</CODE> provides the same features as
+ * creating a <CODE>QueueSender</CODE>. A <CODE>MessageProducer</CODE>
+ * object is recommended when creating new code. The <CODE>QueueSender</CODE>
+ * is provided to support existing code.
+ * 
+ * @see javax.jms.MessageProducer
+ * @see javax.jms.QueueSession#createSender(Queue)
+ */
+
+public class ActiveMQQueueSender extends ActiveMQMessageProducer implements QueueSender {
+
+    protected ActiveMQQueueSender(ActiveMQSession session, ActiveMQDestination destination,int sendTimeout)
+        throws JMSException {
+        super(session, session.getNextProducerId(), destination,sendTimeout);
+    }
+
+    /**
+     * Gets the queue associated with this <CODE>QueueSender</CODE>.
+     * 
+     * @return this sender's queue
+     * @throws JMSException if the JMS provider fails to get the queue for this
+     *                 <CODE>QueueSender</CODE> due to some internal error.
+     */
+
+    public Queue getQueue() throws JMSException {
+        return (Queue)super.getDestination();
+    }
+
+    /**
+     * Sends a message to a queue for an unidentified message producer. Uses the
+     * <CODE>QueueSender</CODE>'s default delivery mode, priority, and time
+     * to live. <p/>
+     * <P>
+     * Typically, a message producer is assigned a queue at creation time;
+     * however, the JMS API also supports unidentified message producers, which
+     * require that the queue be supplied every time a message is sent.
+     * 
+     * @param queue the queue to send this message to
+     * @param message the message to send
+     * @throws JMSException if the JMS provider fails to send the message due to
+     *                 some internal error.
+     * @see javax.jms.MessageProducer#getDeliveryMode()
+     * @see javax.jms.MessageProducer#getTimeToLive()
+     * @see javax.jms.MessageProducer#getPriority()
+     */
+
+    public void send(Queue queue, Message message) throws JMSException {
+        super.send(queue, message);
+    }
+
+    /**
+     * Sends a message to a queue for an unidentified message producer,
+     * specifying delivery mode, priority and time to live. <p/>
+     * <P>
+     * Typically, a message producer is assigned a queue at creation time;
+     * however, the JMS API also supports unidentified message producers, which
+     * require that the queue be supplied every time a message is sent.
+     * 
+     * @param queue the queue to send this message to
+     * @param message the message to send
+     * @param deliveryMode the delivery mode to use
+     * @param priority the priority for this message
+     * @param timeToLive the message's lifetime (in milliseconds)
+     * @throws JMSException if the JMS provider fails to send the message due to
+     *                 some internal error.
+     */
+
+    public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive)
+        throws JMSException {
+        super.send(queue, message, deliveryMode, priority, timeToLive);
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSender.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSession.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSession.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSession.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSession.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.Serializable;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+/**
+ * A QueueSession implementation that throws IllegalStateExceptions when Topic
+ * operations are attempted but which delegates to another QueueSession for all
+ * other operations. The ActiveMQSessions implement both Topic and Queue
+ * Sessions methods but the spec states that Queue session should throw
+ * Exceptions if topic operations are attempted on it.
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class ActiveMQQueueSession implements QueueSession {
+
+    private final QueueSession next;
+
+    public ActiveMQQueueSession(QueueSession next) {
+        this.next = next;
+    }
+
+    /**
+     * @throws JMSException
+     */
+    public void close() throws JMSException {
+        next.close();
+    }
+
+    /**
+     * @throws JMSException
+     */
+    public void commit() throws JMSException {
+        next.commit();
+    }
+
+    /**
+     * @param queue
+     * @return
+     * @throws JMSException
+     */
+    public QueueBrowser createBrowser(Queue queue) throws JMSException {
+        return next.createBrowser(queue);
+    }
+
+    /**
+     * @param queue
+     * @param messageSelector
+     * @return
+     * @throws JMSException
+     */
+    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
+        return next.createBrowser(queue, messageSelector);
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public BytesMessage createBytesMessage() throws JMSException {
+        return next.createBytesMessage();
+    }
+
+    /**
+     * @param destination
+     * @return
+     * @throws JMSException
+     */
+    public MessageConsumer createConsumer(Destination destination) throws JMSException {
+        if (destination instanceof Topic) {
+            throw new InvalidDestinationException("Topics are not supported by a QueueSession");
+        }
+        return next.createConsumer(destination);
+    }
+
+    /**
+     * @param destination
+     * @param messageSelector
+     * @return
+     * @throws JMSException
+     */
+    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
+        if (destination instanceof Topic) {
+            throw new InvalidDestinationException("Topics are not supported by a QueueSession");
+        }
+        return next.createConsumer(destination, messageSelector);
+    }
+
+    /**
+     * @param destination
+     * @param messageSelector
+     * @param noLocal
+     * @return
+     * @throws JMSException
+     */
+    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
+        if (destination instanceof Topic) {
+            throw new InvalidDestinationException("Topics are not supported by a QueueSession");
+        }
+        return next.createConsumer(destination, messageSelector, noLocal);
+    }
+
+    /**
+     * @param topic
+     * @param name
+     * @return
+     * @throws JMSException
+     */
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a QueueSession");
+    }
+
+    /**
+     * @param topic
+     * @param name
+     * @param messageSelector
+     * @param noLocal
+     * @return
+     * @throws JMSException
+     */
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a QueueSession");
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public MapMessage createMapMessage() throws JMSException {
+        return next.createMapMessage();
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public Message createMessage() throws JMSException {
+        return next.createMessage();
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public ObjectMessage createObjectMessage() throws JMSException {
+        return next.createObjectMessage();
+    }
+
+    /**
+     * @param object
+     * @return
+     * @throws JMSException
+     */
+    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
+        return next.createObjectMessage(object);
+    }
+
+    /**
+     * @param destination
+     * @return
+     * @throws JMSException
+     */
+    public MessageProducer createProducer(Destination destination) throws JMSException {
+        if (destination instanceof Topic) {
+            throw new InvalidDestinationException("Topics are not supported by a QueueSession");
+        }
+        return next.createProducer(destination);
+    }
+
+    /**
+     * @param queueName
+     * @return
+     * @throws JMSException
+     */
+    public Queue createQueue(String queueName) throws JMSException {
+        return next.createQueue(queueName);
+    }
+
+    /**
+     * @param queue
+     * @return
+     * @throws JMSException
+     */
+    public QueueReceiver createReceiver(Queue queue) throws JMSException {
+        return next.createReceiver(queue);
+    }
+
+    /**
+     * @param queue
+     * @param messageSelector
+     * @return
+     * @throws JMSException
+     */
+    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
+        return next.createReceiver(queue, messageSelector);
+    }
+
+    /**
+     * @param queue
+     * @return
+     * @throws JMSException
+     */
+    public QueueSender createSender(Queue queue) throws JMSException {
+        return next.createSender(queue);
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public StreamMessage createStreamMessage() throws JMSException {
+        return next.createStreamMessage();
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public TemporaryQueue createTemporaryQueue() throws JMSException {
+        return next.createTemporaryQueue();
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public TemporaryTopic createTemporaryTopic() throws JMSException {
+        throw new IllegalStateException("Operation not supported by a QueueSession");
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public TextMessage createTextMessage() throws JMSException {
+        return next.createTextMessage();
+    }
+
+    /**
+     * @param text
+     * @return
+     * @throws JMSException
+     */
+    public TextMessage createTextMessage(String text) throws JMSException {
+        return next.createTextMessage(text);
+    }
+
+    /**
+     * @param topicName
+     * @return
+     * @throws JMSException
+     */
+    public Topic createTopic(String topicName) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a QueueSession");
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    public boolean equals(Object arg0) {
+        return next.equals(arg0);
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public int getAcknowledgeMode() throws JMSException {
+        return next.getAcknowledgeMode();
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public MessageListener getMessageListener() throws JMSException {
+        return next.getMessageListener();
+    }
+
+    /**
+     * @return
+     * @throws JMSException
+     */
+    public boolean getTransacted() throws JMSException {
+        return next.getTransacted();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Object#hashCode()
+     */
+    public int hashCode() {
+        return next.hashCode();
+    }
+
+    /**
+     * @throws JMSException
+     */
+    public void recover() throws JMSException {
+        next.recover();
+    }
+
+    /**
+     * @throws JMSException
+     */
+    public void rollback() throws JMSException {
+        next.rollback();
+    }
+
+    /**
+     * 
+     */
+    public void run() {
+        next.run();
+    }
+
+    /**
+     * @param listener
+     * @throws JMSException
+     */
+    public void setMessageListener(MessageListener listener) throws JMSException {
+        next.setMessageListener(listener);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Object#toString()
+     */
+    public String toString() {
+        return next.toString();
+    }
+
+    /**
+     * @param name
+     * @throws JMSException
+     */
+    public void unsubscribe(String name) throws JMSException {
+        throw new IllegalStateException("Operation not supported by a QueueSession");
+    }
+
+    public QueueSession getNext() {
+        return next;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSession.java
------------------------------------------------------------------------------
    svn:executable = *