You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/02 23:29:35 UTC
svn commit: r781177 [6/11] - in /activemq/sandbox/activemq-flow:
activemq-bio/ activemq-bio/src/main/java/org/
activemq-bio/src/main/java/org/apache/
activemq-bio/src/main/java/org/apache/activemq/
activemq-bio/src/main/java/org/apache/activemq/transpo...
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.util.IOExceptionSupport;
+
+/**
+ * @version $Revision$
+ */
+public class ActiveMQOutputStream extends OutputStream implements Disposable {
+
+ // Send down 64k messages.
+ protected int count;
+
+ final byte buffer[] = new byte[64 * 1024];
+
+ private final ActiveMQConnection connection;
+ private final Map<String, Object> properties;
+ private final ProducerInfo info;
+
+ private long messageSequence;
+ private boolean closed;
+ private final int deliveryMode;
+ private final int priority;
+ private final long timeToLive;
+
+ public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority,
+ long timeToLive) throws JMSException {
+ this.connection = connection;
+ this.deliveryMode = deliveryMode;
+ this.priority = priority;
+ this.timeToLive = timeToLive;
+ this.properties = properties == null ? null : new HashMap<String, Object>(properties);
+
+ if (destination == null) {
+ throw new InvalidDestinationException("Don't understand null destinations");
+ }
+
+ this.info = new ProducerInfo(producerId);
+ this.info.setDestination(destination);
+
+ this.connection.addOutputStream(this);
+ this.connection.asyncSendPacket(info);
+ }
+
+ public void close() throws IOException {
+ if (!closed) {
+ flushBuffer();
+ try {
+ // Send an EOS style empty message to signal EOS.
+ send(new ActiveMQMessage(), true);
+ dispose();
+ this.connection.asyncSendPacket(info.createRemoveCommand());
+ } catch (JMSException e) {
+ IOExceptionSupport.create(e);
+ }
+ }
+ }
+
+ public void dispose() {
+ if (!closed) {
+ this.connection.removeOutputStream(this);
+ closed = true;
+ }
+ }
+
+ public synchronized void write(int b) throws IOException {
+ buffer[count++] = (byte) b;
+ if (count == buffer.length) {
+ flushBuffer();
+ }
+ }
+
+ public synchronized void write(byte b[], int off, int len) throws IOException {
+ while (len > 0) {
+ int max = Math.min(len, buffer.length - count);
+ System.arraycopy(b, off, buffer, count, max);
+
+ len -= max;
+ count += max;
+ off += max;
+
+ if (count == buffer.length) {
+ flushBuffer();
+ }
+ }
+ }
+
+ public synchronized void flush() throws IOException {
+ flushBuffer();
+ }
+
+ private void flushBuffer() throws IOException {
+ if (count != 0) {
+ try {
+ ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
+ msg.writeBytes(buffer, 0, count);
+ send(msg, false);
+ } catch (JMSException e) {
+ throw IOExceptionSupport.create(e);
+ }
+ count = 0;
+ }
+ }
+
+ /**
+ * @param msg
+ * @throws JMSException
+ */
+ private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
+ if (properties != null) {
+ for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) {
+ String key = (String) iter.next();
+ Object value = properties.get(key);
+ msg.setObjectProperty(key, value);
+ }
+ }
+ msg.setType("org.apache.activemq.Stream");
+ msg.setGroupID(info.getProducerId().toString());
+ if (eosMessage) {
+ msg.setGroupSequence(-1);
+ } else {
+ msg.setGroupSequence((int) messageSequence);
+ }
+ MessageId id = new MessageId(info.getProducerId(), messageSequence++);
+ connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage);
+ }
+
+ public String toString() {
+ return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
+ }
+
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.Serializable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Defines the prefetch message policies for different types of consumers
+ *
+ * @org.apache.xbean.XBean element="prefetchPolicy"
+ * @version $Revision: 1.3 $
+ */
+public class ActiveMQPrefetchPolicy implements Serializable {
+ private static final Log LOG = LogFactory.getLog(ActiveMQPrefetchPolicy.class);
+ private static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE - 1;
+ private int queuePrefetch;
+ private int queueBrowserPrefetch;
+ private int topicPrefetch;
+ private int durableTopicPrefetch;
+ private int optimizeDurableTopicPrefetch;
+ private int inputStreamPrefetch;
+ private int maximumPendingMessageLimit;
+
+ /**
+ * Initialize default prefetch policies
+ */
+ public ActiveMQPrefetchPolicy() {
+ this.queuePrefetch = 1000;
+ this.queueBrowserPrefetch = 500;
+ this.topicPrefetch = MAX_PREFETCH_SIZE;
+ this.durableTopicPrefetch = 100;
+ this.optimizeDurableTopicPrefetch = 1000;
+ this.inputStreamPrefetch = 100;
+ }
+
+ /**
+ * @return Returns the durableTopicPrefetch.
+ */
+ public int getDurableTopicPrefetch() {
+ return durableTopicPrefetch;
+ }
+
+ /**
+ * @param durableTopicPrefetch The durableTopicPrefetch to set.
+ */
+ public void setDurableTopicPrefetch(int durableTopicPrefetch) {
+ this.durableTopicPrefetch = getMaxPrefetchLimit(durableTopicPrefetch);
+ }
+
+ /**
+ * @return Returns the queuePrefetch.
+ */
+ public int getQueuePrefetch() {
+ return queuePrefetch;
+ }
+
+ /**
+ * @param queuePrefetch The queuePrefetch to set.
+ */
+ public void setQueuePrefetch(int queuePrefetch) {
+ this.queuePrefetch = getMaxPrefetchLimit(queuePrefetch);
+ }
+
+ /**
+ * @return Returns the queueBrowserPrefetch.
+ */
+ public int getQueueBrowserPrefetch() {
+ return queueBrowserPrefetch;
+ }
+
+ /**
+ * @param queueBrowserPrefetch The queueBrowserPrefetch to set.
+ */
+ public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
+ this.queueBrowserPrefetch = getMaxPrefetchLimit(queueBrowserPrefetch);
+ }
+
+ /**
+ * @return Returns the topicPrefetch.
+ */
+ public int getTopicPrefetch() {
+ return topicPrefetch;
+ }
+
+ /**
+ * @param topicPrefetch The topicPrefetch to set.
+ */
+ public void setTopicPrefetch(int topicPrefetch) {
+ this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch);
+ }
+
+ /**
+ * @return Returns the optimizeDurableTopicPrefetch.
+ */
+ public int getOptimizeDurableTopicPrefetch() {
+ return optimizeDurableTopicPrefetch;
+ }
+
+ /**
+ * @param optimizeAcknowledgePrefetch The optimizeDurableTopicPrefetch to
+ * set.
+ */
+ public void setOptimizeDurableTopicPrefetch(int optimizeAcknowledgePrefetch) {
+ this.optimizeDurableTopicPrefetch = optimizeAcknowledgePrefetch;
+ }
+
+ public int getMaximumPendingMessageLimit() {
+ return maximumPendingMessageLimit;
+ }
+
+ /**
+ * Sets how many messages a broker will keep around, above the prefetch
+ * limit, for non-durable topics before starting to discard older messages.
+ */
+ public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
+ this.maximumPendingMessageLimit = maximumPendingMessageLimit;
+ }
+
+ private int getMaxPrefetchLimit(int value) {
+ int result = Math.min(value, MAX_PREFETCH_SIZE);
+ if (result < value) {
+ LOG.warn("maximum prefetch limit has been reset from " + value + " to " + MAX_PREFETCH_SIZE);
+ }
+ return result;
+ }
+
+ public void setAll(int i) {
+ this.durableTopicPrefetch = i;
+ this.queueBrowserPrefetch = i;
+ this.queuePrefetch = i;
+ this.topicPrefetch = i;
+ this.inputStreamPrefetch = 1;
+ this.optimizeDurableTopicPrefetch = i;
+ }
+
+ public int getInputStreamPrefetch() {
+ return inputStreamPrefetch;
+ }
+
+ public void setInputStreamPrefetch(int inputStreamPrefetch) {
+ this.inputStreamPrefetch = getMaxPrefetchLimit(inputStreamPrefetch);
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.Enumeration;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageDispatch;
+
+/**
+ * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a
+ * queue without removing them. <p/>
+ * <P>
+ * The <CODE>getEnumeration</CODE> method returns a <CODE>
+ * java.util.Enumeration</CODE>
+ * that is used to scan the queue's messages. It may be an enumeration of the
+ * entire content of a queue, or it may contain only the messages matching a
+ * message selector. <p/>
+ * <P>
+ * Messages may be arriving and expiring while the scan is done. The JMS API
+ * does not require the content of an enumeration to be a static snapshot of
+ * queue content. Whether these changes are visible or not depends on the JMS
+ * provider. <p/>
+ * <P>
+ * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
+ * </CODE>
+ * or a <CODE>QueueSession</CODE>.
+ *
+ * @see javax.jms.Session#createBrowser
+ * @see javax.jms.QueueSession#createBrowser
+ * @see javax.jms.QueueBrowser
+ * @see javax.jms.QueueReceiver
+ */
+
+public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
+
+ private final ActiveMQSession session;
+ private final ActiveMQDestination destination;
+ private final String selector;
+
+ private ActiveMQMessageConsumer consumer;
+ private boolean closed;
+ private final ConsumerId consumerId;
+ private final AtomicBoolean browseDone = new AtomicBoolean(true);
+ private final boolean dispatchAsync;
+ private Object semaphore = new Object();
+
+ /**
+ * Constructor for an ActiveMQQueueBrowser - used internally
+ *
+ * @param theSession
+ * @param dest
+ * @param selector
+ * @throws JMSException
+ */
+ protected ActiveMQQueueBrowser(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination destination, String selector, boolean dispatchAsync) throws JMSException {
+ this.session = session;
+ this.consumerId = consumerId;
+ this.destination = destination;
+ this.selector = selector;
+ this.dispatchAsync = dispatchAsync;
+ this.consumer = createConsumer();
+ }
+
+ /**
+ * @param session
+ * @param originalDestination
+ * @param selectorExpression
+ * @param cnum
+ * @return
+ * @throws JMSException
+ */
+ private ActiveMQMessageConsumer createConsumer() throws JMSException {
+ browseDone.set(false);
+ ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
+ return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy
+ .getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) {
+ public void dispatch(MessageDispatch md) {
+ if (md.getMessage() == null) {
+ browseDone.set(true);
+ } else {
+ super.dispatch(md);
+ }
+ notifyMessageAvailable();
+ }
+ };
+ }
+
+ private void destroyConsumer() {
+ if (consumer == null) {
+ return;
+ }
+ try {
+ consumer.close();
+ consumer = null;
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Gets an enumeration for browsing the current queue messages in the order
+ * they would be received.
+ *
+ * @return an enumeration for browsing the messages
+ * @throws JMSException if the JMS provider fails to get the enumeration for
+ * this browser due to some internal error.
+ */
+
+ public Enumeration getEnumeration() throws JMSException {
+ checkClosed();
+ if (consumer == null) {
+ consumer = createConsumer();
+ }
+ return this;
+ }
+
+ private void checkClosed() throws IllegalStateException {
+ if (closed) {
+ throw new IllegalStateException("The Consumer is closed");
+ }
+ }
+
+ /**
+ * @return true if more messages to process
+ */
+ public boolean hasMoreElements() {
+ while (true) {
+
+ synchronized (this) {
+ if (consumer == null) {
+ return false;
+ }
+ }
+
+ if (consumer.getMessageSize() > 0) {
+ return true;
+ }
+
+ if (browseDone.get() || !session.isRunning()) {
+ destroyConsumer();
+ return false;
+ }
+
+ waitForMessage();
+ }
+ }
+
+ /**
+ * @return the next message
+ */
+ public Object nextElement() {
+ while (true) {
+
+ synchronized (this) {
+ if (consumer == null) {
+ return null;
+ }
+ }
+
+ try {
+ Message answer = consumer.receiveNoWait();
+ if (answer != null) {
+ return answer;
+ }
+ } catch (JMSException e) {
+ this.session.connection.onClientInternalException(e);
+ return null;
+ }
+
+ if (browseDone.get() || !session.isRunning()) {
+ destroyConsumer();
+ return null;
+ }
+
+ waitForMessage();
+ }
+ }
+
+ public synchronized void close() throws JMSException {
+ destroyConsumer();
+ closed = true;
+ }
+
+ /**
+ * Gets the queue associated with this queue browser.
+ *
+ * @return the queue
+ * @throws JMSException if the JMS provider fails to get the queue
+ * associated with this browser due to some internal error.
+ */
+
+ public Queue getQueue() throws JMSException {
+ return (Queue)destination;
+ }
+
+ public String getMessageSelector() throws JMSException {
+ return selector;
+ }
+
+ // Implementation methods
+ // -------------------------------------------------------------------------
+
+ /**
+ * Wait on a semaphore for a fixed amount of time for a message to come in.
+ */
+ protected void waitForMessage() {
+ try {
+ synchronized (semaphore) {
+ semaphore.wait(2000);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ protected void notifyMessageAvailable() {
+ synchronized (semaphore) {
+ semaphore.notifyAll();
+ }
+ }
+
+ public String toString() {
+ return "ActiveMQQueueBrowser { value=" + consumerId + " }";
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+
+/**
+ * A client uses a <CODE>QueueReceiver</CODE> object to receive messages that
+ * have been delivered to a queue. <p/>
+ * <P>
+ * Although it is possible to have multiple <CODE>QueueReceiver</CODE> s for
+ * the same queue, the JMS API does not define how messages are distributed
+ * between the <CODE>QueueReceiver</CODE>s. <p/>
+ * <P>
+ * If a <CODE>QueueReceiver</CODE> specifies a message selector, the messages
+ * that are not selected remain on the queue. By definition, a message selector
+ * allows a <CODE>QueueReceiver</CODE> to skip messages. This means that when
+ * the skipped messages are eventually read, the total ordering of the reads
+ * does not retain the partial order defined by each message producer. Only
+ * <CODE>QueueReceiver</CODE> s without a message selector will read messages
+ * in message producer order. <p/>
+ * <P>
+ * Creating a <CODE>MessageConsumer</CODE> provides the same features as
+ * creating a <CODE>QueueReceiver</CODE>. A <CODE>MessageConsumer</CODE>
+ * object is recommended for creating new code. The <CODE>QueueReceiver
+ * </CODE>
+ * is provided to support existing code.
+ *
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination, String)
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination)
+ * @see javax.jms.QueueSession#createReceiver(Queue, String)
+ * @see javax.jms.QueueSession#createReceiver(Queue)
+ * @see javax.jms.MessageConsumer
+ */
+
+public class ActiveMQQueueReceiver extends ActiveMQMessageConsumer implements QueueReceiver {
+
+ /**
+ * @param theSession
+ * @param value
+ * @param destination
+ * @param messageSelector
+ * @param prefetch
+ * @param asyncDispatch
+ * @throws JMSException
+ */
+ protected ActiveMQQueueReceiver(ActiveMQSession theSession, ConsumerId consumerId,
+ ActiveMQDestination destination, String selector, int prefetch,
+ int maximumPendingMessageCount, boolean asyncDispatch)
+ throws JMSException {
+ super(theSession, consumerId, destination, null, selector, prefetch, maximumPendingMessageCount,
+ false, false, asyncDispatch, null);
+ }
+
+ /**
+ * Gets the <CODE>Queue</CODE> associated with this queue receiver.
+ *
+ * @return this receiver's <CODE>Queue</CODE>
+ * @throws JMSException if the JMS provider fails to get the queue for this
+ * queue receiver due to some internal error.
+ */
+
+ public Queue getQueue() throws JMSException {
+ checkClosed();
+ return (Queue)super.getDestination();
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSender.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSender.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSender.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * A client uses a <CODE>QueueSender</CODE> object to send messages to a
+ * queue. <p/>
+ * <P>
+ * Normally, the <CODE>Queue</CODE> is specified when a <CODE>QueueSender
+ * </CODE>
+ * is created. In this case, an attempt to use the <CODE>send</CODE> methods
+ * for an unidentified <CODE>QueueSender</CODE> will throw a <CODE>
+ * java.lang.UnsupportedOperationException</CODE>.
+ * <p/>
+ * <P>
+ * If the <CODE>QueueSender</CODE> is created with an unidentified <CODE>
+ * Queue</CODE>,
+ * an attempt to use the <CODE>send</CODE> methods that assume that the
+ * <CODE>Queue</CODE> has been identified will throw a <CODE>
+ * java.lang.UnsupportedOperationException</CODE>.
+ * <p/>
+ * <P>
+ * During the execution of its <CODE>send</CODE> method, a message must not be
+ * changed by other threads within the client. If the message is modified, the
+ * result of the <CODE>send</CODE> is undefined. <p/>
+ * <P>
+ * After sending a message, a client may retain and modify it without affecting
+ * the message that has been sent. The same message object may be sent multiple
+ * times. <p/>
+ * <P>
+ * The following message headers are set as part of sending a message:
+ * <code>JMSDestination</code>, <code>JMSDeliveryMode</code>,<code>JMSExpiration</code>,<code>JMSPriority</code>,
+ * <code>JMSMessageID</code> and <code>JMSTimeStamp</code>. When the
+ * message is sent, the values of these headers are ignored. After the
+ * completion of the <CODE>send</CODE>, the headers hold the values specified
+ * by the method sending the message. It is possible for the <code>send</code>
+ * method not to set <code>JMSMessageID</code> and <code>JMSTimeStamp</code>
+ * if the setting of these headers is explicitly disabled by the
+ * <code>MessageProducer.setDisableMessageID</code> or
+ * <code>MessageProducer.setDisableMessageTimestamp</code> method. <p/>
+ * <P>
+ * Creating a <CODE>MessageProducer</CODE> provides the same features as
+ * creating a <CODE>QueueSender</CODE>. A <CODE>MessageProducer</CODE>
+ * object is recommended when creating new code. The <CODE>QueueSender</CODE>
+ * is provided to support existing code.
+ *
+ * @see javax.jms.MessageProducer
+ * @see javax.jms.QueueSession#createSender(Queue)
+ */
+
+public class ActiveMQQueueSender extends ActiveMQMessageProducer implements QueueSender {
+
+ protected ActiveMQQueueSender(ActiveMQSession session, ActiveMQDestination destination,int sendTimeout)
+ throws JMSException {
+ super(session, session.getNextProducerId(), destination,sendTimeout);
+ }
+
+ /**
+ * Gets the queue associated with this <CODE>QueueSender</CODE>.
+ *
+ * @return this sender's queue
+ * @throws JMSException if the JMS provider fails to get the queue for this
+ * <CODE>QueueSender</CODE> due to some internal error.
+ */
+
+ public Queue getQueue() throws JMSException {
+ return (Queue)super.getDestination();
+ }
+
+ /**
+ * Sends a message to a queue for an unidentified message producer. Uses the
+ * <CODE>QueueSender</CODE>'s default delivery mode, priority, and time
+ * to live. <p/>
+ * <P>
+ * Typically, a message producer is assigned a queue at creation time;
+ * however, the JMS API also supports unidentified message producers, which
+ * require that the queue be supplied every time a message is sent.
+ *
+ * @param queue the queue to send this message to
+ * @param message the message to send
+ * @throws JMSException if the JMS provider fails to send the message due to
+ * some internal error.
+ * @see javax.jms.MessageProducer#getDeliveryMode()
+ * @see javax.jms.MessageProducer#getTimeToLive()
+ * @see javax.jms.MessageProducer#getPriority()
+ */
+
+ public void send(Queue queue, Message message) throws JMSException {
+ super.send(queue, message);
+ }
+
+ /**
+ * Sends a message to a queue for an unidentified message producer,
+ * specifying delivery mode, priority and time to live. <p/>
+ * <P>
+ * Typically, a message producer is assigned a queue at creation time;
+ * however, the JMS API also supports unidentified message producers, which
+ * require that the queue be supplied every time a message is sent.
+ *
+ * @param queue the queue to send this message to
+ * @param message the message to send
+ * @param deliveryMode the delivery mode to use
+ * @param priority the priority for this message
+ * @param timeToLive the message's lifetime (in milliseconds)
+ * @throws JMSException if the JMS provider fails to send the message due to
+ * some internal error.
+ */
+
+ public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive)
+ throws JMSException {
+ super.send(queue, message, deliveryMode, priority, timeToLive);
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSender.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSession.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSession.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSession.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSession.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.Serializable;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+/**
+ * A QueueSession implementation that throws IllegalStateExceptions when Topic
+ * operations are attempted but which delegates to another QueueSession for all
+ * other operations. The ActiveMQSessions implement both Topic and Queue
+ * Sessions methods but the spec states that Queue session should throw
+ * Exceptions if topic operations are attempted on it.
+ *
+ * @version $Revision: 1.2 $
+ */
+public class ActiveMQQueueSession implements QueueSession {
+
+ private final QueueSession next;
+
+ public ActiveMQQueueSession(QueueSession next) {
+ this.next = next;
+ }
+
+ /**
+ * @throws JMSException
+ */
+ public void close() throws JMSException {
+ next.close();
+ }
+
+ /**
+ * @throws JMSException
+ */
+ public void commit() throws JMSException {
+ next.commit();
+ }
+
+ /**
+ * @param queue
+ * @return
+ * @throws JMSException
+ */
+ public QueueBrowser createBrowser(Queue queue) throws JMSException {
+ return next.createBrowser(queue);
+ }
+
+ /**
+ * @param queue
+ * @param messageSelector
+ * @return
+ * @throws JMSException
+ */
+ public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
+ return next.createBrowser(queue, messageSelector);
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public BytesMessage createBytesMessage() throws JMSException {
+ return next.createBytesMessage();
+ }
+
+ /**
+ * @param destination
+ * @return
+ * @throws JMSException
+ */
+ public MessageConsumer createConsumer(Destination destination) throws JMSException {
+ if (destination instanceof Topic) {
+ throw new InvalidDestinationException("Topics are not supported by a QueueSession");
+ }
+ return next.createConsumer(destination);
+ }
+
+ /**
+ * @param destination
+ * @param messageSelector
+ * @return
+ * @throws JMSException
+ */
+ public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
+ if (destination instanceof Topic) {
+ throw new InvalidDestinationException("Topics are not supported by a QueueSession");
+ }
+ return next.createConsumer(destination, messageSelector);
+ }
+
+ /**
+ * @param destination
+ * @param messageSelector
+ * @param noLocal
+ * @return
+ * @throws JMSException
+ */
+ public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
+ if (destination instanceof Topic) {
+ throw new InvalidDestinationException("Topics are not supported by a QueueSession");
+ }
+ return next.createConsumer(destination, messageSelector, noLocal);
+ }
+
+ /**
+ * @param topic
+ * @param name
+ * @return
+ * @throws JMSException
+ */
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param topic
+ * @param name
+ * @param messageSelector
+ * @param noLocal
+ * @return
+ * @throws JMSException
+ */
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public MapMessage createMapMessage() throws JMSException {
+ return next.createMapMessage();
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public Message createMessage() throws JMSException {
+ return next.createMessage();
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public ObjectMessage createObjectMessage() throws JMSException {
+ return next.createObjectMessage();
+ }
+
+ /**
+ * @param object
+ * @return
+ * @throws JMSException
+ */
+ public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
+ return next.createObjectMessage(object);
+ }
+
+ /**
+ * @param destination
+ * @return
+ * @throws JMSException
+ */
+ public MessageProducer createProducer(Destination destination) throws JMSException {
+ if (destination instanceof Topic) {
+ throw new InvalidDestinationException("Topics are not supported by a QueueSession");
+ }
+ return next.createProducer(destination);
+ }
+
+ /**
+ * @param queueName
+ * @return
+ * @throws JMSException
+ */
+ public Queue createQueue(String queueName) throws JMSException {
+ return next.createQueue(queueName);
+ }
+
+ /**
+ * @param queue
+ * @return
+ * @throws JMSException
+ */
+ public QueueReceiver createReceiver(Queue queue) throws JMSException {
+ return next.createReceiver(queue);
+ }
+
+ /**
+ * @param queue
+ * @param messageSelector
+ * @return
+ * @throws JMSException
+ */
+ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
+ return next.createReceiver(queue, messageSelector);
+ }
+
+ /**
+ * @param queue
+ * @return
+ * @throws JMSException
+ */
+ public QueueSender createSender(Queue queue) throws JMSException {
+ return next.createSender(queue);
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public StreamMessage createStreamMessage() throws JMSException {
+ return next.createStreamMessage();
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public TemporaryQueue createTemporaryQueue() throws JMSException {
+ return next.createTemporaryQueue();
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public TemporaryTopic createTemporaryTopic() throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public TextMessage createTextMessage() throws JMSException {
+ return next.createTextMessage();
+ }
+
+ /**
+ * @param text
+ * @return
+ * @throws JMSException
+ */
+ public TextMessage createTextMessage(String text) throws JMSException {
+ return next.createTextMessage(text);
+ }
+
+ /**
+ * @param topicName
+ * @return
+ * @throws JMSException
+ */
+ public Topic createTopic(String topicName) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object arg0) {
+ return next.equals(arg0);
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public int getAcknowledgeMode() throws JMSException {
+ return next.getAcknowledgeMode();
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public MessageListener getMessageListener() throws JMSException {
+ return next.getMessageListener();
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public boolean getTransacted() throws JMSException {
+ return next.getTransacted();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return next.hashCode();
+ }
+
+ /**
+ * @throws JMSException
+ */
+ public void recover() throws JMSException {
+ next.recover();
+ }
+
+ /**
+ * @throws JMSException
+ */
+ public void rollback() throws JMSException {
+ next.rollback();
+ }
+
+ /**
+ *
+ */
+ public void run() {
+ next.run();
+ }
+
+ /**
+ * @param listener
+ * @throws JMSException
+ */
+ public void setMessageListener(MessageListener listener) throws JMSException {
+ next.setMessageListener(listener);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ return next.toString();
+ }
+
+ /**
+ * @param name
+ * @throws JMSException
+ */
+ public void unsubscribe(String name) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ public QueueSession getNext() {
+ return next;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSession.java
------------------------------------------------------------------------------
svn:executable = *