You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/02 23:29:35 UTC
svn commit: r781177 [8/11] - in /activemq/sandbox/activemq-flow:
activemq-bio/ activemq-bio/src/main/java/org/
activemq-bio/src/main/java/org/apache/
activemq-bio/src/main/java/org/apache/activemq/
activemq-bio/src/main/java/org/apache/activemq/transpo...
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A utility class used by the Session for dispatching messages asynchronously
+ * to consumers
+ *
+ * @version $Revision$
+ * @see javax.jms.Session
+ */
+public class ActiveMQSessionExecutor implements Task {
+ private static final Log LOG = LogFactory.getLog(ActiveMQSessionExecutor.class);
+
+ private ActiveMQSession session;
+ private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
+ private boolean dispatchedBySessionPool;
+ private volatile TaskRunner taskRunner;
+ private boolean startedOrWarnedThatNotStarted;
+
+ ActiveMQSessionExecutor(ActiveMQSession session) {
+ this.session = session;
+ }
+
+ void setDispatchedBySessionPool(boolean value) {
+ dispatchedBySessionPool = value;
+ wakeup();
+ }
+
+ void execute(MessageDispatch message) throws InterruptedException {
+
+ if (!startedOrWarnedThatNotStarted) {
+
+ ActiveMQConnection connection = session.connection;
+ long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout();
+ if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) {
+ startedOrWarnedThatNotStarted = true;
+ } else {
+ long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated();
+
+ // lets only warn when a significant amount of time has passed
+ // just in case its normal operation
+ if (elapsedTime > aboutUnstartedConnectionTimeout) {
+ LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection
+ + " Received: " + message);
+ startedOrWarnedThatNotStarted = true;
+ }
+ }
+ }
+
+ if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) {
+ dispatch(message);
+ } else {
+ messageQueue.enqueue(message);
+ wakeup();
+ }
+ }
+
+ public void wakeup() {
+ if (!dispatchedBySessionPool) {
+ if (session.isSessionAsyncDispatch()) {
+ try {
+ TaskRunner taskRunner = this.taskRunner;
+ if (taskRunner == null) {
+ synchronized (this) {
+ if (this.taskRunner == null) {
+ this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
+ "ActiveMQ Session: " + session.getSessionId());
+ }
+ taskRunner = this.taskRunner;
+ }
+ }
+ taskRunner.wakeup();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ while (iterate()) {
+ }
+ }
+ }
+ }
+
+ void executeFirst(MessageDispatch message) {
+ messageQueue.enqueueFirst(message);
+ wakeup();
+ }
+
+ public boolean hasUncomsumedMessages() {
+ return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
+ }
+
+ void dispatch(MessageDispatch message) {
+
+ // TODO - we should use a Map for this indexed by consumerId
+
+ for (ActiveMQMessageConsumer consumer : this.session.consumers) {
+ ConsumerId consumerId = message.getConsumerId();
+ if (consumerId.equals(consumer.getConsumerId())) {
+ consumer.dispatch(message);
+ break;
+ }
+ }
+ }
+
+ synchronized void start() {
+ if (!messageQueue.isRunning()) {
+ messageQueue.start();
+ if (hasUncomsumedMessages()) {
+ wakeup();
+ }
+ }
+ }
+
+ void stop() throws JMSException {
+ try {
+ if (messageQueue.isRunning()) {
+ messageQueue.stop();
+ TaskRunner taskRunner = this.taskRunner;
+ if (taskRunner != null) {
+ this.taskRunner = null;
+ taskRunner.shutdown();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+
+ boolean isRunning() {
+ return messageQueue.isRunning();
+ }
+
+ void close() {
+ messageQueue.close();
+ }
+
+ void clear() {
+ messageQueue.clear();
+ }
+
+ MessageDispatch dequeueNoWait() {
+ return messageQueue.dequeueNoWait();
+ }
+
+ protected void clearMessagesInProgress() {
+ messageQueue.clear();
+ }
+
+ public boolean isEmpty() {
+ return messageQueue.isEmpty();
+ }
+
+ public boolean iterate() {
+
+ // Deliver any messages queued on the consumer to their listeners.
+ for (ActiveMQMessageConsumer consumer : this.session.consumers) {
+ if (consumer.iterate()) {
+ return true;
+ }
+ }
+
+ // No messages left queued on the listeners.. so now dispatch messages
+ // queued on the session
+ MessageDispatch message = messageQueue.dequeueNoWait();
+ if (message == null) {
+ return false;
+ } else {
+ dispatch(message);
+ return !messageQueue.isEmpty();
+ }
+ }
+
+ List getUnconsumedMessages() {
+ return messageQueue.removeAll();
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * A client uses a <CODE>TopicPublisher</CODE> object to publish messages on
+ * a topic. A <CODE>TopicPublisher</CODE> object is the publish-subscribe
+ * form of a message producer.
+ * <p/>
+ * <P>
+ * Normally, the <CODE>Topic</CODE> is specified when a <CODE>TopicPublisher
+ * </CODE> is created. In this case, an attempt to use the <CODE>publish
+ * </CODE> methods for an unidentified <CODE>TopicPublisher</CODE> will throw
+ * a <CODE>java.lang.UnsupportedOperationException</CODE>.
+ * <p/>
+ * <P>
+ * If the <CODE>TopicPublisher</CODE> is created with an unidentified <CODE>
+ * Topic</CODE>, an attempt to use the <CODE>publish</CODE> methods that
+ * assume that the <CODE>Topic</CODE> has been identified will throw a <CODE>
+ * java.lang.UnsupportedOperationException</CODE>.
+ * <p/>
+ * <P>
+ * During the execution of its <CODE>publish</CODE> method, a message must
+ * not be changed by other threads within the client. If the message is
+ * modified, the result of the <CODE>publish</CODE> is undefined.
+ * <p/>
+ * <P>
+ * After publishing a message, a client may retain and modify it without
+ * affecting the message that has been published. The same message object may
+ * be published multiple times.
+ * <p/>
+ * <P>
+ * The following message headers are set as part of publishing a message:
+ * <code>JMSDestination</code>,<code>JMSDeliveryMode</code>,<code>JMSExpiration</code>,
+ * <code>JMSPriority</code>,<code>JMSMessageID</code> and <code>JMSTimeStamp</code>.
+ * When the message is published, the values of these headers are ignored.
+ * After completion of the <CODE>publish</CODE>, the headers hold the values
+ * specified by the method publishing the message. It is possible for the
+ * <CODE>publish</CODE> method not to set <code>JMSMessageID</code> and
+ * <code>JMSTimeStamp</code> if the setting of these headers is explicitly
+ * disabled by the <code>MessageProducer.setDisableMessageID</code> or <code>MessageProducer.setDisableMessageTimestamp</code>
+ * method.
+ * <p/>
+ * <P>
+ * Creating a <CODE>MessageProducer</CODE> provides the same features as
+ * creating a <CODE>TopicPublisher</CODE>. A <CODE>MessageProducer</CODE>
+ * object is recommended when creating new code. The <CODE>TopicPublisher
+ * </CODE> is provided to support existing code.
+ * <p/>
+ * <p/>
+ * <P>
+ * Because <CODE>TopicPublisher</CODE> inherits from <CODE>MessageProducer
+ * </CODE>, it inherits the <CODE>send</CODE> methods that are a part of the
+ * <CODE>MessageProducer</CODE> interface. Using the <CODE>send</CODE>
+ * methods will have the same effect as using the <CODE>publish</CODE>
+ * methods: they are functionally the same.
+ *
+ * @see Session#createProducer(Destination)
+ * @see TopicSession#createPublisher(Topic)
+ */
+
+public class ActiveMQTopicPublisher extends ActiveMQMessageProducer implements
+ TopicPublisher {
+
+ protected ActiveMQTopicPublisher(ActiveMQSession session,
+ ActiveMQDestination destination, int sendTimeout) throws JMSException {
+ super(session, session.getNextProducerId(), destination,sendTimeout);
+ }
+
+ /**
+ * Gets the topic associated with this <CODE>TopicPublisher</CODE>.
+ *
+ * @return this publisher's topic
+ * @throws JMSException if the JMS provider fails to get the topic for this
+ * <CODE>TopicPublisher</CODE> due to some internal error.
+ */
+
+ public Topic getTopic() throws JMSException {
+ return (Topic) super.getDestination();
+ }
+
+ /**
+ * Publishes a message to the topic. Uses the <CODE>TopicPublisher</CODE>'s
+ * default delivery mode, priority, and time to live.
+ *
+ * @param message the message to publish
+ * @throws JMSException if the JMS provider fails to publish the message due to
+ * some internal error.
+ * @throws MessageFormatException if an invalid message is specified.
+ * @throws InvalidDestinationException if a client uses this method with a <CODE>TopicPublisher
+ * </CODE> with an invalid topic.
+ * @throws java.lang.UnsupportedOperationException
+ * if a client uses this method with a <CODE>TopicPublisher
+ * </CODE> that did not specify a topic at creation time.
+ * @see javax.jms.MessageProducer#getDeliveryMode()
+ * @see javax.jms.MessageProducer#getTimeToLive()
+ * @see javax.jms.MessageProducer#getPriority()
+ */
+
+ public void publish(Message message) throws JMSException {
+ super.send(message);
+ }
+
+ /**
+ * Publishes a message to the topic, specifying delivery mode, priority,
+ * and time to live.
+ *
+ * @param message the message to publish
+ * @param deliveryMode the delivery mode to use
+ * @param priority the priority for this message
+ * @param timeToLive the message's lifetime (in milliseconds)
+ * @throws JMSException if the JMS provider fails to publish the message due to
+ * some internal error.
+ * @throws MessageFormatException if an invalid message is specified.
+ * @throws InvalidDestinationException if a client uses this method with a <CODE>TopicPublisher
+ * </CODE> with an invalid topic.
+ * @throws java.lang.UnsupportedOperationException
+ * if a client uses this method with a <CODE>TopicPublisher
+ * </CODE> that did not specify a topic at creation time.
+ */
+
+ public void publish(Message message, int deliveryMode, int priority,
+ long timeToLive) throws JMSException {
+ super.send(message, deliveryMode, priority, timeToLive);
+ }
+
+ /**
+ * Publishes a message to a topic for an unidentified message producer.
+ * Uses the <CODE>TopicPublisher</CODE>'s default delivery mode,
+ * priority, and time to live.
+ * <p/>
+ * <P>
+ * Typically, a message producer is assigned a topic at creation time;
+ * however, the JMS API also supports unidentified message producers, which
+ * require that the topic be supplied every time a message is published.
+ *
+ * @param topic the topic to publish this message to
+ * @param message the message to publish
+ * @throws JMSException if the JMS provider fails to publish the message due to
+ * some internal error.
+ * @throws MessageFormatException if an invalid message is specified.
+ * @throws InvalidDestinationException if a client uses this method with an invalid topic.
+ * @see javax.jms.MessageProducer#getDeliveryMode()
+ * @see javax.jms.MessageProducer#getTimeToLive()
+ * @see javax.jms.MessageProducer#getPriority()
+ */
+
+ public void publish(Topic topic, Message message) throws JMSException {
+ super.send(topic, message);
+ }
+
+ /**
+ * Publishes a message to a topic for an unidentified message producer,
+ * specifying delivery mode, priority and time to live.
+ * <p/>
+ * <P>
+ * Typically, a message producer is assigned a topic at creation time;
+ * however, the JMS API also supports unidentified message producers, which
+ * require that the topic be supplied every time a message is published.
+ *
+ * @param topic the topic to publish this message to
+ * @param message the message to publish
+ * @param deliveryMode the delivery mode to use
+ * @param priority the priority for this message
+ * @param timeToLive the message's lifetime (in milliseconds)
+ * @throws JMSException if the JMS provider fails to publish the message due to
+ * some internal error.
+ * @throws MessageFormatException if an invalid message is specified.
+ * @throws InvalidDestinationException if a client uses this method with an invalid topic.
+ */
+
+ public void publish(Topic topic, Message message, int deliveryMode,
+ int priority, long timeToLive) throws JMSException {
+ super.send(topic, message, deliveryMode, priority, timeToLive);
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSession.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSession.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSession.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSession.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.Serializable;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+/**
+ * A TopicSession implementation that throws IllegalStateExceptions when Queue
+ * operations are attempted but which delegates to another TopicSession for all
+ * other operations. The ActiveMQSessions implement both Topic and Queue
+ * Sessions methods but the spec states that TopicSession should throw
+ * Exceptions if queue operations are attempted on it.
+ *
+ * @version $Revision: 1.2 $
+ */
+public class ActiveMQTopicSession implements TopicSession {
+
+ private final TopicSession next;
+
+ public ActiveMQTopicSession(TopicSession next) {
+ this.next = next;
+ }
+
+ /**
+ * @throws JMSException
+ */
+ public void close() throws JMSException {
+ next.close();
+ }
+
+ /**
+ * @throws JMSException
+ */
+ public void commit() throws JMSException {
+ next.commit();
+ }
+
+ /**
+ * @param queue
+ * @return
+ * @throws JMSException
+ */
+ public QueueBrowser createBrowser(Queue queue) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a TopicSession");
+ }
+
+ /**
+ * @param queue
+ * @param messageSelector
+ * @return
+ * @throws JMSException
+ */
+ public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a TopicSession");
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public BytesMessage createBytesMessage() throws JMSException {
+ return next.createBytesMessage();
+ }
+
+ /**
+ * @param destination
+ * @return
+ * @throws JMSException
+ */
+ public MessageConsumer createConsumer(Destination destination) throws JMSException {
+ if (destination instanceof Queue) {
+ throw new InvalidDestinationException("Queues are not supported by a TopicSession");
+ }
+ return next.createConsumer(destination);
+ }
+
+ /**
+ * @param destination
+ * @param messageSelector
+ * @return
+ * @throws JMSException
+ */
+ public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
+ if (destination instanceof Queue) {
+ throw new InvalidDestinationException("Queues are not supported by a TopicSession");
+ }
+ return next.createConsumer(destination, messageSelector);
+ }
+
+ /**
+ * @param destination
+ * @param messageSelector
+ * @param noLocal
+ * @return
+ * @throws JMSException
+ */
+ public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
+ if (destination instanceof Queue) {
+ throw new InvalidDestinationException("Queues are not supported by a TopicSession");
+ }
+ return next.createConsumer(destination, messageSelector, noLocal);
+ }
+
+ /**
+ * @param topic
+ * @param name
+ * @return
+ * @throws JMSException
+ */
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
+ return next.createDurableSubscriber(topic, name);
+ }
+
+ /**
+ * @param topic
+ * @param name
+ * @param messageSelector
+ * @param noLocal
+ * @return
+ * @throws JMSException
+ */
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
+ return next.createDurableSubscriber(topic, name, messageSelector, noLocal);
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public MapMessage createMapMessage() throws JMSException {
+ return next.createMapMessage();
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public Message createMessage() throws JMSException {
+ return next.createMessage();
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public ObjectMessage createObjectMessage() throws JMSException {
+ return next.createObjectMessage();
+ }
+
+ /**
+ * @param object
+ * @return
+ * @throws JMSException
+ */
+ public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
+ return next.createObjectMessage(object);
+ }
+
+ /**
+ * @param destination
+ * @return
+ * @throws JMSException
+ */
+ public MessageProducer createProducer(Destination destination) throws JMSException {
+ if (destination instanceof Queue) {
+ throw new InvalidDestinationException("Queues are not supported by a TopicSession");
+ }
+ return next.createProducer(destination);
+ }
+
+ /**
+ * @param topic
+ * @return
+ * @throws JMSException
+ */
+ public TopicPublisher createPublisher(Topic topic) throws JMSException {
+ return next.createPublisher(topic);
+ }
+
+ /**
+ * @param queueName
+ * @return
+ * @throws JMSException
+ */
+ public Queue createQueue(String queueName) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a TopicSession");
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public StreamMessage createStreamMessage() throws JMSException {
+ return next.createStreamMessage();
+ }
+
+ /**
+ * @param topic
+ * @return
+ * @throws JMSException
+ */
+ public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
+ return next.createSubscriber(topic);
+ }
+
+ /**
+ * @param topic
+ * @param messageSelector
+ * @param noLocal
+ * @return
+ * @throws JMSException
+ */
+ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
+ return next.createSubscriber(topic, messageSelector, noLocal);
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public TemporaryQueue createTemporaryQueue() throws JMSException {
+ throw new IllegalStateException("Operation not supported by a TopicSession");
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public TemporaryTopic createTemporaryTopic() throws JMSException {
+ return next.createTemporaryTopic();
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public TextMessage createTextMessage() throws JMSException {
+ return next.createTextMessage();
+ }
+
+ /**
+ * @param text
+ * @return
+ * @throws JMSException
+ */
+ public TextMessage createTextMessage(String text) throws JMSException {
+ return next.createTextMessage(text);
+ }
+
+ /**
+ * @param topicName
+ * @return
+ * @throws JMSException
+ */
+ public Topic createTopic(String topicName) throws JMSException {
+ return next.createTopic(topicName);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object arg0) {
+ return next.equals(arg0);
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public int getAcknowledgeMode() throws JMSException {
+ return next.getAcknowledgeMode();
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public MessageListener getMessageListener() throws JMSException {
+ return next.getMessageListener();
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ */
+ public boolean getTransacted() throws JMSException {
+ return next.getTransacted();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return next.hashCode();
+ }
+
+ /**
+ * @throws JMSException
+ */
+ public void recover() throws JMSException {
+ next.recover();
+ }
+
+ /**
+ * @throws JMSException
+ */
+ public void rollback() throws JMSException {
+ next.rollback();
+ }
+
+ /**
+ *
+ */
+ public void run() {
+ next.run();
+ }
+
+ /**
+ * @param listener
+ * @throws JMSException
+ */
+ public void setMessageListener(MessageListener listener) throws JMSException {
+ next.setMessageListener(listener);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ return next.toString();
+ }
+
+ /**
+ * @param name
+ * @throws JMSException
+ */
+ public void unsubscribe(String name) throws JMSException {
+ next.unsubscribe(name);
+ }
+
+ public TopicSession getNext() {
+ return next;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSession.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+
+/**
+ * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
+ * that have been published to a topic. A <CODE>TopicSubscriber</CODE> object
+ * is the publish/subscribe form of a message consumer. A <CODE>
+ * MessageConsumer</CODE> can be created by using <CODE>
+ * Session.createConsumer</CODE>.
+ * <p/>
+ * <P>
+ * A <CODE>TopicSession</CODE> allows the creation of multiple <CODE>
+ * TopicSubscriber</CODE> objects per topic. It will deliver each message for
+ * a topic to each subscriber eligible to receive it. Each copy of the message
+ * is treated as a completely separate message. Work done on one copy has no
+ * effect on the others; acknowledging one does not acknowledge the others; one
+ * message may be delivered immediately, while another waits for its subscriber
+ * to process messages ahead of it.
+ * <p/>
+ * <P>
+ * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive
+ * only messages that are published while they are active.
+ * <p/>
+ * <P>
+ * Messages filtered out by a subscriber's message selector will never be
+ * delivered to the subscriber. From the subscriber's perspective, they do not
+ * exist.
+ * <p/>
+ * <P>
+ * In some cases, a connection may both publish and subscribe to a topic. The
+ * subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to inhibit
+ * the delivery of messages published by its own connection.
+ * <p/>
+ * <P>
+ * If a client needs to receive all the messages published on a topic,
+ * including the ones published while the subscriber is inactive, it uses a
+ * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record
+ * of this durable subscription and insures that all messages from the topic's
+ * publishers are retained until they are acknowledged by this durable
+ * subscriber or they have expired.
+ * <p/>
+ * <P>
+ * Sessions with durable subscribers must always provide the same client
+ * identifier. In addition, each client must specify a name that uniquely
+ * identifies (within client identifier) each durable subscription it creates.
+ * Only one session at a time can have a <CODE>TopicSubscriber</CODE> for a
+ * particular durable subscription.
+ * <p/>
+ * <P>
+ * A client can change an existing durable subscription by creating a durable
+ * <CODE>TopicSubscriber</CODE> with the same name and a new topic and/or
+ * message selector. Changing a durable subscription is equivalent to
+ * unsubscribing (deleting) the old one and creating a new one.
+ * <p/>
+ * <P>
+ * The <CODE>unsubscribe</CODE> method is used to delete a durable
+ * subscription. The <CODE>unsubscribe</CODE> method can be used at the
+ * <CODE>Session</CODE> or <CODE>TopicSession</CODE> level. This method
+ * deletes the state being maintained on behalf of the subscriber by its
+ * provider.
+ * <p/>
+ * <P>
+ * Creating a <CODE>MessageConsumer</CODE> provides the same features as
+ * creating a <CODE>TopicSubscriber</CODE>. To create a durable subscriber,
+ * use of <CODE>Session.CreateDurableSubscriber</CODE> is recommended. The
+ * <CODE>TopicSubscriber</CODE> is provided to support existing code.
+ *
+ * @see javax.jms.Session#createConsumer
+ * @see javax.jms.Session#createDurableSubscriber
+ * @see javax.jms.TopicSession
+ * @see javax.jms.TopicSession#createSubscriber
+ * @see javax.jms.TopicSubscriber
+ * @see javax.jms.MessageConsumer
+ */
+
+public class ActiveMQTopicSubscriber extends ActiveMQMessageConsumer implements
+ TopicSubscriber {
+
+ /**
+ * @param theSession
+ * @param value
+ * @param dest
+ * @param name
+ * @param selector
+ * @param cnum
+ * @param noLocalValue
+ * @param browserValue
+ * @param asyncDispatch
+ * @throws JMSException
+ */
+ protected ActiveMQTopicSubscriber(ActiveMQSession theSession,
+ ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount,
+ boolean noLocalValue, boolean browserValue, boolean asyncDispatch) throws JMSException {
+ super(theSession, consumerId, dest, name, selector, prefetch, maximumPendingMessageCount, noLocalValue, browserValue, asyncDispatch, null);
+ }
+
+ /**
+ * Gets the <CODE>Topic</CODE> associated with this subscriber.
+ *
+ * @return this subscriber's <CODE>Topic</CODE>
+ * @throws JMSException if the JMS provider fails to get the topic for this topic
+ * subscriber due to some internal error.
+ */
+
+ public Topic getTopic() throws JMSException {
+ checkClosed();
+ return (Topic) super.getDestination();
+ }
+
+ /**
+ * Gets the <CODE>NoLocal</CODE> attribute for this subscriber. The
+ * default value for this attribute is false.
+ *
+ * @return true if locally published messages are being inhibited
+ * @throws JMSException if the JMS provider fails to get the <CODE>NoLocal
+ * </CODE> attribute for this topic subscriber due to some
+ * internal error.
+ */
+
+ public boolean getNoLocal() throws JMSException {
+ checkClosed();
+ return super.isNoLocal();
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.XAConnection;
+import javax.jms.XAQueueConnection;
+import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicConnection;
+import javax.jms.XATopicSession;
+
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.IdGenerator;
+
+/**
+ * The XAConnection interface extends the capability of Connection by providing
+ * an XASession (optional).
+ * <p/>
+ * The XAConnection interface is optional. JMS providers are not required to
+ * support this interface. This interface is for use by JMS providers to
+ * support transactional environments. Client programs are strongly encouraged
+ * to use the transactional support available in their environment, rather
+ * than use these XA interfaces directly.
+ *
+ * @version $Revision: 1.6 $
+ * @see javax.jms.Connection
+ * @see javax.jms.ConnectionFactory
+ * @see javax.jms.QueueConnection
+ * @see javax.jms.TopicConnection
+ * @see javax.jms.TopicConnectionFactory
+ * @see javax.jms.QueueConnection
+ * @see javax.jms.QueueConnectionFactory
+ */
+public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicConnection, XAQueueConnection, XAConnection {
+
+ protected ActiveMQXAConnection(Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
+ super(transport, clientIdGenerator, factoryStats);
+ }
+
+ public XASession createXASession() throws JMSException {
+ return (XASession) createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ public XATopicSession createXATopicSession() throws JMSException {
+ return (XATopicSession) createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ public XAQueueSession createXAQueueSession() throws JMSException {
+ return (XAQueueSession) createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
+ checkClosedOrFailed();
+ ensureConnectionInfoSent();
+ return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, isDispatchAsync());
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.net.URI;
+
+import javax.jms.JMSException;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XAQueueConnection;
+import javax.jms.XAQueueConnectionFactory;
+import javax.jms.XATopicConnection;
+import javax.jms.XATopicConnectionFactory;
+
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.transport.Transport;
+
+/**
+ * A factory of {@link XAConnection} instances
+ *
+ * @version $Revision: 564057 $
+ */
+public class ActiveMQXAConnectionFactory extends ActiveMQConnectionFactory implements XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory {
+
+ public ActiveMQXAConnectionFactory() {
+ }
+
+ public ActiveMQXAConnectionFactory(String userName, String password, String brokerURL) {
+ super(userName, password, brokerURL);
+ }
+
+ public ActiveMQXAConnectionFactory(String userName, String password, URI brokerURL) {
+ super(userName, password, brokerURL);
+ }
+
+ public ActiveMQXAConnectionFactory(String brokerURL) {
+ super(brokerURL);
+ }
+
+ public ActiveMQXAConnectionFactory(URI brokerURL) {
+ super(brokerURL);
+ }
+
+ public XAConnection createXAConnection() throws JMSException {
+ return (XAConnection) createActiveMQConnection();
+ }
+
+ public XAConnection createXAConnection(String userName, String password) throws JMSException {
+ return (XAConnection) createActiveMQConnection(userName, password);
+ }
+
+ public XAQueueConnection createXAQueueConnection() throws JMSException {
+ return (XAQueueConnection) createActiveMQConnection();
+ }
+
+ public XAQueueConnection createXAQueueConnection(String userName, String password) throws JMSException {
+ return (XAQueueConnection) createActiveMQConnection(userName, password);
+ }
+
+ public XATopicConnection createXATopicConnection() throws JMSException {
+ return (XATopicConnection) createActiveMQConnection();
+ }
+
+ public XATopicConnection createXATopicConnection(String userName, String password) throws JMSException {
+ return (XATopicConnection) createActiveMQConnection(userName, password);
+ }
+
+ protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
+ ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, getClientIdGenerator(), stats);
+ return connection;
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASession.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASession.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASession.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASession.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TopicSession;
+import javax.jms.TransactionInProgressException;
+import javax.jms.XAQueueSession;
+import javax.jms.XATopicSession;
+import javax.transaction.xa.XAResource;
+
+import org.apache.activemq.command.SessionId;
+
+/**
+ * The XASession interface extends the capability of Session by adding access
+ * to a JMS provider's support for the Java Transaction API (JTA) (optional).
+ * This support takes the form of a javax.transaction.xa.XAResource object.
+ * The functionality of this object closely resembles that defined by the
+ * standard X/Open XA Resource interface.
+ * <p/>
+ * An application server controls the transactional assignment of an XASession
+ * by obtaining its XAResource. It uses the XAResource to assign the session
+ * to a transaction, prepare and commit work on the transaction, and so on.
+ * <p/>
+ * An XAResource provides some fairly sophisticated facilities for
+ * interleaving work on multiple transactions, recovering a list of
+ * transactions in progress, and so on. A JTA aware JMS provider must fully
+ * implement this functionality. This could be done by using the services of a
+ * database that supports XA, or a JMS provider may choose to implement this
+ * functionality from scratch.
+ * <p/>
+ * A client of the application server is given what it thinks is a regular
+ * JMS Session. Behind the scenes, the application server controls the
+ * transaction management of the underlying XASession.
+ * <p/>
+ * The XASession interface is optional. JMS providers are not required to
+ * support this interface. This interface is for use by JMS providers to
+ * support transactional environments. Client programs are strongly encouraged
+ * to use the transactional support available in their environment, rather
+ * than use these XA interfaces directly.
+ *
+ * @version $Revision: 1.5 $
+ * @see javax.jms.Session
+ * @see javax.jms.QueueSession
+ * @see javax.jms.TopicSession
+ * @see javax.jms.XASession
+ */
+public class ActiveMQXASession extends ActiveMQSession implements QueueSession, TopicSession, XAQueueSession, XATopicSession {
+
+ public ActiveMQXASession(ActiveMQXAConnection connection, SessionId sessionId, int theAcknowlegeMode, boolean dispatchAsync) throws JMSException {
+ super(connection, sessionId, theAcknowlegeMode, dispatchAsync);
+ }
+
+ public boolean getTransacted() throws JMSException {
+ return true;
+ }
+
+ public void rollback() throws JMSException {
+ throw new TransactionInProgressException("Cannot rollback() inside an XASession");
+ }
+
+ public void commit() throws JMSException {
+ throw new TransactionInProgressException("Cannot commit() inside an XASession");
+ }
+
+ public Session getSession() throws JMSException {
+ return this;
+ }
+
+ public XAResource getXAResource() {
+ return getTransactionContext();
+ }
+
+ public QueueSession getQueueSession() throws JMSException {
+ return new ActiveMQQueueSession(this);
+ }
+
+ public TopicSession getTopicSession() throws JMSException {
+ return new ActiveMQTopicSession(this);
+ }
+
+ /**
+ * This is called before transacted work is done by
+ * the session. XA Work can only be done when this
+ * XA resource is associated with an Xid.
+ *
+ * @throws JMSException not associated with an Xid
+ */
+ protected void doStartTransaction() throws JMSException {
+
+ if (!getTransactionContext().isInXATransaction()) {
+ throw new JMSException("Session's XAResource has not been enlisted in a distributed transaction.");
+ }
+
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASession.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class AdvisoryConsumer implements ActiveMQDispatcher {
+ private static final transient Log LOG = LogFactory.getLog(AdvisoryConsumer.class);
+
+ int deliveredCounter;
+
+ private final ActiveMQConnection connection;
+ private ConsumerInfo info;
+ private boolean closed;
+
+ public AdvisoryConsumer(ActiveMQConnection connection, ConsumerId consumerId) throws JMSException {
+ this.connection = connection;
+ info = new ConsumerInfo(consumerId);
+ info.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
+ info.setPrefetchSize(1000);
+ info.setNoLocal(true);
+
+ this.connection.addDispatcher(info.getConsumerId(), this);
+ this.connection.syncSendPacket(this.info);
+ }
+
+ public synchronized void dispose() {
+ if (!closed) {
+ try {
+ this.connection.asyncSendPacket(info.createRemoveCommand());
+ } catch (JMSException e) {
+ LOG.info("Failed to send remove command: " + e, e);
+ }
+ this.connection.removeDispatcher(info.getConsumerId());
+ closed = true;
+ }
+ }
+
+ public void dispatch(MessageDispatch md) {
+
+ // Auto ack messages when we reach 75% of the prefetch
+ deliveredCounter++;
+ if (deliveredCounter > (0.75 * info.getPrefetchSize())) {
+ try {
+ MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
+ connection.asyncSendPacket(ack);
+ deliveredCounter = 0;
+ } catch (JMSException e) {
+ connection.onClientInternalException(e);
+ }
+ }
+
+ DataStructure o = md.getMessage().getDataStructure();
+ if (o != null && o.getClass() == DestinationInfo.class) {
+ processDestinationInfo((DestinationInfo)o);
+ } else {
+ //This can happen across networks
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unexpected message was dispatched to the AdvisoryConsumer: "+md);
+ }
+ }
+
+ }
+
+ private void processDestinationInfo(DestinationInfo dinfo) {
+ ActiveMQDestination dest = dinfo.getDestination();
+ if (!dest.isTemporary()) {
+ return;
+ }
+
+ ActiveMQTempDestination tempDest = (ActiveMQTempDestination)dest;
+ if (dinfo.getOperationType() == DestinationInfo.ADD_OPERATION_TYPE) {
+ connection.activeTempDestinations.put(tempDest, tempDest);
+ } else if (dinfo.getOperationType() == DestinationInfo.REMOVE_OPERATION_TYPE) {
+ connection.activeTempDestinations.remove(tempDest);
+ }
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AlreadyClosedException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AlreadyClosedException.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AlreadyClosedException.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AlreadyClosedException.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+
+/**
+ * An exception which is closed if you try to access a resource which has already
+ * been closed
+ *
+ * @version $Revision: 1.2 $
+ */
+public class AlreadyClosedException extends JMSException {
+
+ private static final long serialVersionUID = -3203104889571618702L;
+
+ public AlreadyClosedException() {
+ super("this connection");
+ }
+
+ public AlreadyClosedException(String description) {
+ super("Cannot use " + description + " as it has already been closed", "AMQ-1001");
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AlreadyClosedException.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/BlobMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/BlobMessage.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/BlobMessage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/BlobMessage.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+/**
+ * Represents a message which has a typically out of band Binary Large Object
+ * (BLOB)
+ *
+ * @version $Revision: $
+ */
+public interface BlobMessage extends Message {
+
+ /**
+ * Return the input stream to process the BLOB
+ */
+ InputStream getInputStream() throws IOException, JMSException;
+
+ /**
+ * Returns the URL for the blob if its available as an external URL (such as file, http, ftp etc)
+ * or null if there is no URL available
+ */
+ URL getURL() throws MalformedURLException, JMSException;
+
+
+ /**
+ * The MIME type of the BLOB which can be used to apply different content types to messages.
+ */
+ String getMimeType();
+
+ /**
+ * Sets the MIME type of the BLOB so that a consumer can process things nicely with a Java Activation Framework
+ * DataHandler
+ */
+ void setMimeType(String mimeType);
+
+
+ String getName();
+
+ /**
+ * The name of the attachment which can be useful information if transmitting files over ActiveMQ
+ */
+ void setName(String name);
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ClientInternalExceptionListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ClientInternalExceptionListener.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ClientInternalExceptionListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ClientInternalExceptionListener.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+/**
+ * An exception listener similar to the standard <code>javax.jms.ExceptionListener</code>
+ * which can be used by client code to be notified of exceptions thrown by container components
+ * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
+ * <p>
+ * The <code>org.apache.activemq.ActiveMQConnection</code> that the listener has been registered with does
+ * this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code> describing
+ * the problem.
+ * </p>
+ *
+ * @author Kai Hudalla
+ * @see ActiveMQConnection#setClientInternalExceptionListener(org.apache.activemq.ClientInternalExceptionListener)
+ */
+public interface ClientInternalExceptionListener
+{
+ /**
+ * Notifies a client of an exception while asynchronously processing a message.
+ *
+ * @param exception the exception describing the problem
+ */
+ void onException(Throwable exception);
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Closeable.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Closeable.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Closeable.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Closeable.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+
+/**
+ * Provides a uniform interface that can be used to close all the JMS obejcts
+ * that provide a close() method. Useful for when you want to collect a
+ * heterogeous set of JMS object in a collection to be closed at a later time.
+ *
+ * @version $Revision: 1.2 $
+ */
+public interface Closeable {
+
+ /**
+ * Closes a JMS object.
+ * <P>
+ * Many JMS objects are closeable such as Connections, Sessions, Consumers
+ * and Producers.
+ *
+ * @throws JMSException if the JMS provider fails to close the object due to
+ * some internal error.
+ */
+ void close() throws JMSException;
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Closeable.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConfigurationException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConfigurationException.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConfigurationException.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConfigurationException.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+
+/**
+ * An exception thrown when a service is not correctly configured
+ *
+ * @version $Revision: 1.2 $
+ */
+public class ConfigurationException extends JMSException {
+ private static final long serialVersionUID = 5639082552451065258L;
+
+ public ConfigurationException(String description) {
+ super(description, "AMQ-1002");
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConfigurationException.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionAudit.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionAudit.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionAudit.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionAudit.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.LinkedHashMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.util.LRUCache;
+
+/**
+ * An auditor class for a Connection that looks for duplicates
+ */
+class ConnectionAudit {
+
+ private boolean checkForDuplicates;
+ private LinkedHashMap<ActiveMQDestination, ActiveMQMessageAudit> destinations = new LRUCache<ActiveMQDestination, ActiveMQMessageAudit>(1000);
+ private LinkedHashMap<ActiveMQDispatcher, ActiveMQMessageAudit> dispatchers = new LRUCache<ActiveMQDispatcher, ActiveMQMessageAudit>(1000);
+
+
+ private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
+ private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
+
+
+ synchronized void removeDispatcher(ActiveMQDispatcher dispatcher) {
+ dispatchers.remove(dispatcher);
+ }
+
+ synchronized boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
+ if (checkForDuplicates && message != null) {
+ ActiveMQDestination destination = message.getDestination();
+ if (destination != null) {
+ if (destination.isQueue()) {
+ ActiveMQMessageAudit audit = destinations.get(destination);
+ if (audit == null) {
+ audit = new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber);
+ destinations.put(destination, audit);
+ }
+ boolean result = audit.isDuplicate(message);
+ return result;
+ }
+ ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
+ if (audit == null) {
+ audit = new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber);
+ dispatchers.put(dispatcher, audit);
+ }
+ boolean result = audit.isDuplicate(message);
+ return result;
+ }
+ }
+ return false;
+ }
+
+ protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
+ if (checkForDuplicates && message != null) {
+ ActiveMQDestination destination = message.getDestination();
+ if (destination != null) {
+ if (destination.isQueue()) {
+ ActiveMQMessageAudit audit = destinations.get(destination);
+ if (audit != null) {
+ audit.rollback(message);
+ }
+ } else {
+ ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
+ if (audit != null) {
+ audit.rollback(message);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @return the checkForDuplicates
+ */
+ boolean isCheckForDuplicates() {
+ return this.checkForDuplicates;
+ }
+
+ /**
+ * @param checkForDuplicates the checkForDuplicates to set
+ */
+ void setCheckForDuplicates(boolean checkForDuplicates) {
+ this.checkForDuplicates = checkForDuplicates;
+ }
+
+ public int getAuditDepth() {
+ return auditDepth;
+ }
+
+ public void setAuditDepth(int auditDepth) {
+ this.auditDepth = auditDepth;
+ }
+
+ public int getAuditMaximumProducerNumber() {
+ return auditMaximumProducerNumber;
+ }
+
+ public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
+ this.auditMaximumProducerNumber = auditMaximumProducerNumber;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionClosedException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionClosedException.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionClosedException.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionClosedException.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.IllegalStateException;
+
+/**
+ * An exception thrown when attempt is made to use a connection when the connection has been closed.
+ *
+ * @version $Revision: 1.2 $
+ */
+public class ConnectionClosedException extends IllegalStateException {
+ private static final long serialVersionUID = -7681404582227153308L;
+
+ public ConnectionClosedException() {
+ super("The connection is already closed", "AlreadyClosed");
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionClosedException.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionFailedException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionFailedException.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionFailedException.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionFailedException.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+/**
+ * An exception thrown when the a connection failure is detected (peer might
+ * close the connection, or a keep alive times out, etc.)
+ *
+ * @version $Revision$
+ */
+public class ConnectionFailedException extends JMSException {
+
+ private static final long serialVersionUID = 2288453203492073973L;
+
+ public ConnectionFailedException(IOException cause) {
+ super("The JMS connection has failed: " + extractMessage(cause));
+ initCause(cause);
+ setLinkedException(cause);
+ }
+
+ public ConnectionFailedException() {
+ super("The JMS connection has failed due to a Transport problem");
+ }
+
+ private static String extractMessage(IOException cause) {
+ String m = cause.getMessage();
+ if (m == null || m.length() == 0) {
+ m = cause.toString();
+ }
+ return m;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionFailedException.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/CustomDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/CustomDestination.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/CustomDestination.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/CustomDestination.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSubscriber;
+
+/**
+ * Represents a hook to allow the support of custom destinations
+ * such as to support <a href="http://activemq.apache.org/camel/">Apache Camel</a>
+ * to create and manage endpoints
+ *
+ * @version $Revision: $
+ */
+public interface CustomDestination extends Destination {
+
+ // Consumers
+ //-----------------------------------------------------------------------
+ MessageConsumer createConsumer(ActiveMQSession session, String messageSelector);
+ MessageConsumer createConsumer(ActiveMQSession session, String messageSelector, boolean noLocal);
+
+ TopicSubscriber createSubscriber(ActiveMQSession session, String messageSelector, boolean noLocal);
+ TopicSubscriber createDurableSubscriber(ActiveMQSession session, String name, String messageSelector, boolean noLocal);
+
+ QueueReceiver createReceiver(ActiveMQSession session, String messageSelector);
+
+ // Producers
+ //-----------------------------------------------------------------------
+ MessageProducer createProducer(ActiveMQSession session) throws JMSException;
+
+ TopicPublisher createPublisher(ActiveMQSession session) throws JMSException;
+
+ QueueSender createSender(ActiveMQSession session) throws JMSException;
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Disposable.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Disposable.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Disposable.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Disposable.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+
+/**
+ * @version $Revision$
+ */
+public interface Disposable {
+
+ /**
+ */
+ void dispose();
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/EnhancedConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/EnhancedConnection.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/EnhancedConnection.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/EnhancedConnection.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.TopicConnection;
+import javax.jms.QueueConnection;
+import javax.jms.JMSException;
+
+import org.apache.activemq.advisory.DestinationSource;
+
+/**
+ * A set of enhanced APIs for a JMS provider
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface EnhancedConnection extends TopicConnection, QueueConnection, Closeable {
+
+ /**
+ * Returns the {@link DestinationSource} object which can be used to listen to destinations
+ * being created or destroyed or to enquire about the current destinations available on the broker
+ *
+ * @return a lazily created destination source
+ * @throws JMSException
+ */
+ DestinationSource getDestinationSource() throws JMSException;
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/LocalTransactionEventListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/LocalTransactionEventListener.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/LocalTransactionEventListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/LocalTransactionEventListener.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public interface LocalTransactionEventListener {
+ void beginEvent();
+
+ void commitEvent();
+
+ void rollbackEvent();
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/LocalTransactionEventListener.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Message.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Message.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Message.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Message.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+/**
+ * Represents the JMS extension methods in Apache ActiveMQ
+ *
+ * @version $Revision: $
+ */
+public interface Message extends javax.jms.Message {
+
+ /**
+ * Returns the MIME type of this mesage. This can be used in selectors to filter on
+ * the MIME types of the different JMS messages, or in the case of {@link org.apache.activemq.BlobMessage}
+ * it allows you to create a selector on the MIME type of the BLOB body
+ */
+ String getJMSXMimeType();
+
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableConsumer.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableConsumer.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableConsumer.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.MessageConsumer;
+
+/**
+ * An extended JMS interface that adds the ability to be notified when a
+ * message is available for consumption using the receive*() methods
+ * which is useful in Ajax style subscription models.
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface MessageAvailableConsumer extends MessageConsumer {
+
+ /**
+ * Sets the listener used to notify synchronous consumers that there is a message
+ * available so that the {@link MessageConsumer#receiveNoWait()} can be called.
+ */
+ void setAvailableListener(MessageAvailableListener availableListener);
+
+ /**
+ * Gets the listener used to notify synchronous consumers that there is a message
+ * available so that the {@link MessageConsumer#receiveNoWait()} can be called.
+ */
+ MessageAvailableListener getAvailableListener();
+}