You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/02 23:29:35 UTC
svn commit: r781177 [5/11] - in /activemq/sandbox/activemq-flow:
activemq-bio/ activemq-bio/src/main/java/org/
activemq-bio/src/main/java/org/apache/
activemq-bio/src/main/java/org/apache/activemq/
activemq-bio/src/main/java/org/apache/activemq/transpo...
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,1184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.filter.FilterException;
+import org.apache.activemq.management.JMSConsumerStatsImpl;
+import org.apache.activemq.management.StatsCapable;
+import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.Callback;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
+ * from a destination. A <CODE> MessageConsumer</CODE> object is created by
+ * passing a <CODE>Destination</CODE> object to a message-consumer creation
+ * method supplied by a session.
+ * <P>
+ * <CODE>MessageConsumer</CODE> is the parent interface for all message
+ * consumers.
+ * <P>
+ * A message consumer can be created with a message selector. A message selector
+ * allows the client to restrict the messages delivered to the message consumer
+ * to those that match the selector.
+ * <P>
+ * A client may either synchronously receive a message consumer's messages or
+ * have the consumer asynchronously deliver them as they arrive.
+ * <P>
+ * For synchronous receipt, a client can request the next message from a message
+ * consumer using one of its <CODE> receive</CODE> methods. There are several
+ * variations of <CODE>receive</CODE> that allow a client to poll or wait for
+ * the next message.
+ * <P>
+ * For asynchronous delivery, a client can register a
+ * <CODE>MessageListener</CODE> object with a message consumer. As messages
+ * arrive at the message consumer, it delivers them by calling the
+ * <CODE>MessageListener</CODE>'s<CODE>
+ * onMessage</CODE> method.
+ * <P>
+ * It is a client programming error for a <CODE>MessageListener</CODE> to
+ * throw an exception.
+ *
+ * @version $Revision: 1.22 $
+ * @see javax.jms.MessageConsumer
+ * @see javax.jms.QueueReceiver
+ * @see javax.jms.TopicSubscriber
+ * @see javax.jms.Session
+ */
+public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
+
+ private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class);
+ protected static final Scheduler scheduler = Scheduler.getInstance();
+ protected final ActiveMQSession session;
+ protected final ConsumerInfo info;
+
+ // These are the messages waiting to be delivered to the client
+ private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
+
+ // The are the messages that were delivered to the consumer but that have
+ // not been acknowledged. It's kept in reverse order since we
+ // Always walk list in reverse order.
+ private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
+ private int deliveredCounter;
+ private int additionalWindowSize;
+ private long redeliveryDelay;
+ private int ackCounter;
+ private int dispatchedCount;
+ private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
+ private JMSConsumerStatsImpl stats;
+
+ private final String selector;
+ private boolean synchronizationRegistered;
+ private AtomicBoolean started = new AtomicBoolean(false);
+
+ private MessageAvailableListener availableListener;
+
+ private RedeliveryPolicy redeliveryPolicy;
+ private boolean optimizeAcknowledge;
+ private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
+ private ExecutorService executorService;
+ private MessageTransformer transformer;
+ private boolean clearDispatchList;
+
+ private MessageAck pendingAck;
+ private long lastDeliveredSequenceId;
+
+ private IOException failureError;
+
+ /**
+ * Create a MessageConsumer
+ *
+ * @param session
+ * @param dest
+ * @param name
+ * @param selector
+ * @param prefetch
+ * @param maximumPendingMessageCount TODO
+ * @param noLocal
+ * @param browser
+ * @param dispatchAsync
+ * @param messageListener
+ * @throws JMSException
+ */
+ public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
+ String name, String selector, int prefetch,
+ int maximumPendingMessageCount, boolean noLocal, boolean browser,
+ boolean dispatchAsync, MessageListener messageListener) throws JMSException {
+ if (dest == null) {
+ throw new InvalidDestinationException("Don't understand null destinations");
+ } else if (dest.getPhysicalName() == null) {
+ throw new InvalidDestinationException("The destination object was not given a physical name.");
+ } else if (dest.isTemporary()) {
+ String physicalName = dest.getPhysicalName();
+
+ if (physicalName == null) {
+ throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
+ }
+
+ String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
+
+ if (physicalName.indexOf(connectionID) < 0) {
+ throw new InvalidDestinationException(
+ "Cannot use a Temporary destination from another Connection");
+ }
+
+ if (session.connection.isDeleted(dest)) {
+ throw new InvalidDestinationException(
+ "Cannot use a Temporary destination that has been deleted");
+ }
+ if (prefetch < 0) {
+ throw new JMSException("Cannot have a prefetch size less than zero");
+ }
+ }
+
+ this.session = session;
+ this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
+ setTransformer(session.getTransformer());
+
+ this.info = new ConsumerInfo(consumerId);
+ this.info.setExclusive(this.session.connection.isExclusiveConsumer());
+ this.info.setSubscriptionName(name);
+ this.info.setPrefetchSize(prefetch);
+ this.info.setCurrentPrefetchSize(prefetch);
+ this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
+ this.info.setNoLocal(noLocal);
+ this.info.setDispatchAsync(dispatchAsync);
+ this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
+ this.info.setSelector(null);
+
+ // Allows the options on the destination to configure the consumerInfo
+ if (dest.getOptions() != null) {
+ Map<String, String> options = new HashMap<String, String>(dest.getOptions());
+ IntrospectionSupport.setProperties(this.info, options, "consumer.");
+ }
+
+ this.info.setDestination(dest);
+ this.info.setBrowser(browser);
+ try {
+ if (selector != null && selector.trim().length() != 0) {
+ // Validate the selector
+ SelectorParser.parse(selector);
+ this.info.setSelector(selector);
+ this.selector = selector;
+ } else if (info.getSelector() != null) {
+ // Validate the selector
+ SelectorParser.parse(this.info.getSelector());
+ this.selector = this.info.getSelector();
+ } else {
+ this.selector = null;
+ }
+ } catch (FilterException e) {
+ throw JMSExceptionSupport.createInvalidSelectorException(e);
+ }
+
+ this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
+ this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
+ && !info.isBrowser();
+ this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
+
+ if (messageListener != null) {
+ setMessageListener(messageListener);
+ }
+ try {
+ this.session.addConsumer(this);
+ this.session.syncSendPacket(info);
+ } catch (JMSException e) {
+ this.session.removeConsumer(this);
+ throw e;
+ }
+
+ if (session.connection.isStarted()) {
+ start();
+ }
+ }
+
+ public StatsImpl getStats() {
+ return stats;
+ }
+
+ public JMSConsumerStatsImpl getConsumerStats() {
+ return stats;
+ }
+
+ public RedeliveryPolicy getRedeliveryPolicy() {
+ return redeliveryPolicy;
+ }
+
+ /**
+ * Sets the redelivery policy used when messages are redelivered
+ */
+ public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
+ this.redeliveryPolicy = redeliveryPolicy;
+ }
+
+ public MessageTransformer getTransformer() {
+ return transformer;
+ }
+
+ /**
+ * Sets the transformer used to transform messages before they are sent on
+ * to the JMS bus
+ */
+ public void setTransformer(MessageTransformer transformer) {
+ this.transformer = transformer;
+ }
+
+ /**
+ * @return Returns the value.
+ */
+ public ConsumerId getConsumerId() {
+ return info.getConsumerId();
+ }
+
+ /**
+ * @return the consumer name - used for durable consumers
+ */
+ public String getConsumerName() {
+ return this.info.getSubscriptionName();
+ }
+
+ /**
+ * @return true if this consumer does not accept locally produced messages
+ */
+ protected boolean isNoLocal() {
+ return info.isNoLocal();
+ }
+
+ /**
+ * Retrieve is a browser
+ *
+ * @return true if a browser
+ */
+ protected boolean isBrowser() {
+ return info.isBrowser();
+ }
+
+ /**
+ * @return ActiveMQDestination
+ */
+ protected ActiveMQDestination getDestination() {
+ return info.getDestination();
+ }
+
+ /**
+ * @return Returns the prefetchNumber.
+ */
+ public int getPrefetchNumber() {
+ return info.getPrefetchSize();
+ }
+
+ /**
+ * @return true if this is a durable topic subscriber
+ */
+ public boolean isDurableSubscriber() {
+ return info.getSubscriptionName() != null && info.getDestination().isTopic();
+ }
+
+ /**
+ * Gets this message consumer's message selector expression.
+ *
+ * @return this message consumer's message selector, or null if no message
+ * selector exists for the message consumer (that is, if the message
+ * selector was not set or was set to null or the empty string)
+ * @throws JMSException if the JMS provider fails to receive the next
+ * message due to some internal error.
+ */
+ public String getMessageSelector() throws JMSException {
+ checkClosed();
+ return selector;
+ }
+
+ /**
+ * Gets the message consumer's <CODE>MessageListener</CODE>.
+ *
+ * @return the listener for the message consumer, or null if no listener is
+ * set
+ * @throws JMSException if the JMS provider fails to get the message
+ * listener due to some internal error.
+ * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
+ */
+ public MessageListener getMessageListener() throws JMSException {
+ checkClosed();
+ return this.messageListener.get();
+ }
+
+ /**
+ * Sets the message consumer's <CODE>MessageListener</CODE>.
+ * <P>
+ * Setting the message listener to null is the equivalent of unsetting the
+ * message listener for the message consumer.
+ * <P>
+ * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
+ * while messages are being consumed by an existing listener or the consumer
+ * is being used to consume messages synchronously is undefined.
+ *
+ * @param listener the listener to which the messages are to be delivered
+ * @throws JMSException if the JMS provider fails to receive the next
+ * message due to some internal error.
+ * @see javax.jms.MessageConsumer#getMessageListener
+ */
+ public void setMessageListener(MessageListener listener) throws JMSException {
+ checkClosed();
+ if (info.getPrefetchSize() == 0) {
+ throw new JMSException(
+ "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
+ }
+ if (listener != null) {
+ boolean wasRunning = session.isRunning();
+ if (wasRunning) {
+ session.stop();
+ }
+
+ this.messageListener.set(listener);
+ session.redispatch(this, unconsumedMessages);
+
+ if (wasRunning) {
+ session.start();
+ }
+ } else {
+ this.messageListener.set(null);
+ }
+ }
+
+ public MessageAvailableListener getAvailableListener() {
+ return availableListener;
+ }
+
+ /**
+ * Sets the listener used to notify synchronous consumers that there is a
+ * message available so that the {@link MessageConsumer#receiveNoWait()} can
+ * be called.
+ */
+ public void setAvailableListener(MessageAvailableListener availableListener) {
+ this.availableListener = availableListener;
+ }
+
+ /**
+ * Used to get an enqueued message from the unconsumedMessages list. The
+ * amount of time this method blocks is based on the timeout value. - if
+ * timeout==-1 then it blocks until a message is received. - if timeout==0
+ * then it it tries to not block at all, it returns a message if it is
+ * available - if timeout>0 then it blocks up to timeout amount of time.
+ * Expired messages will consumed by this method.
+ *
+ * @throws JMSException
+ * @return null if we timeout or if the consumer is closed.
+ */
+ private MessageDispatch dequeue(long timeout) throws JMSException {
+ try {
+ long deadline = 0;
+ if (timeout > 0) {
+ deadline = System.currentTimeMillis() + timeout;
+ }
+ while (true) {
+ MessageDispatch md = unconsumedMessages.dequeue(timeout);
+ if (md == null) {
+ if (timeout > 0 && !unconsumedMessages.isClosed()) {
+ timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+ } else {
+ if (failureError != null) {
+ throw JMSExceptionSupport.create(failureError);
+ } else {
+ return null;
+ }
+ }
+ } else if (md.getMessage() == null) {
+ return null;
+ } else if (md.getMessage().isExpired()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getConsumerId() + " received expired message: " + md);
+ }
+ beforeMessageIsConsumed(md);
+ afterMessageIsConsumed(md, true);
+ if (timeout > 0) {
+ timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+ }
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(getConsumerId() + " received message: " + md);
+ }
+ return md;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+
+ /**
+ * Receives the next message produced for this message consumer.
+ * <P>
+ * This call blocks indefinitely until a message is produced or until this
+ * message consumer is closed.
+ * <P>
+ * If this <CODE>receive</CODE> is done within a transaction, the consumer
+ * retains the message until the transaction commits.
+ *
+ * @return the next message produced for this message consumer, or null if
+ * this message consumer is concurrently closed
+ */
+ public Message receive() throws JMSException {
+ checkClosed();
+ checkMessageListener();
+
+ sendPullCommand(0);
+ MessageDispatch md = dequeue(-1);
+ if (md == null) {
+ return null;
+ }
+
+ beforeMessageIsConsumed(md);
+ afterMessageIsConsumed(md, false);
+
+ return createActiveMQMessage(md);
+ }
+
+ /**
+ * @param md
+ * @return
+ */
+ private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
+ ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
+ if (transformer != null) {
+ Message transformedMessage = transformer.consumerTransform(session, this, m);
+ if (transformedMessage != null) {
+ m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
+ }
+ }
+ if (session.isClientAcknowledge()) {
+ m.setAcknowledgeCallback(new Callback() {
+ public void execute() throws Exception {
+ session.checkClosed();
+ session.acknowledge();
+ }
+ });
+ }else if (session.isIndividualAcknowledge()) {
+ m.setAcknowledgeCallback(new Callback() {
+ public void execute() throws Exception {
+ session.checkClosed();
+ acknowledge(md);
+ }
+ });
+ }
+ return m;
+ }
+
+ /**
+ * Receives the next message that arrives within the specified timeout
+ * interval.
+ * <P>
+ * This call blocks until a message arrives, the timeout expires, or this
+ * message consumer is closed. A <CODE>timeout</CODE> of zero never
+ * expires, and the call blocks indefinitely.
+ *
+ * @param timeout the timeout value (in milliseconds), a time out of zero
+ * never expires.
+ * @return the next message produced for this message consumer, or null if
+ * the timeout expires or this message consumer is concurrently
+ * closed
+ */
+ public Message receive(long timeout) throws JMSException {
+ checkClosed();
+ checkMessageListener();
+ if (timeout == 0) {
+ return this.receive();
+
+ }
+
+ sendPullCommand(timeout);
+ while (timeout > 0) {
+
+ MessageDispatch md;
+ if (info.getPrefetchSize() == 0) {
+ md = dequeue(-1); // We let the broker let us know when we
+ // timeout.
+ } else {
+ md = dequeue(timeout);
+ }
+
+ if (md == null) {
+ return null;
+ }
+
+ beforeMessageIsConsumed(md);
+ afterMessageIsConsumed(md, false);
+ return createActiveMQMessage(md);
+ }
+ return null;
+ }
+
+ /**
+ * Receives the next message if one is immediately available.
+ *
+ * @return the next message produced for this message consumer, or null if
+ * one is not available
+ * @throws JMSException if the JMS provider fails to receive the next
+ * message due to some internal error.
+ */
+ public Message receiveNoWait() throws JMSException {
+ checkClosed();
+ checkMessageListener();
+ sendPullCommand(-1);
+
+ MessageDispatch md;
+ if (info.getPrefetchSize() == 0) {
+ md = dequeue(-1); // We let the broker let us know when we
+ // timeout.
+ } else {
+ md = dequeue(0);
+ }
+
+ if (md == null) {
+ return null;
+ }
+
+ beforeMessageIsConsumed(md);
+ afterMessageIsConsumed(md, false);
+ return createActiveMQMessage(md);
+ }
+
+ /**
+ * Closes the message consumer.
+ * <P>
+ * Since a provider may allocate some resources on behalf of a <CODE>
+ * MessageConsumer</CODE>
+ * outside the Java virtual machine, clients should close them when they are
+ * not needed. Relying on garbage collection to eventually reclaim these
+ * resources may not be timely enough.
+ * <P>
+ * This call blocks until a <CODE>receive</CODE> or message listener in
+ * progress has completed. A blocked message consumer <CODE>receive </CODE>
+ * call returns null when this message consumer is closed.
+ *
+ * @throws JMSException if the JMS provider fails to close the consumer due
+ * to some internal error.
+ */
+ public void close() throws JMSException {
+ if (!unconsumedMessages.isClosed()) {
+ if (session.getTransactionContext().isInTransaction()) {
+ session.getTransactionContext().addSynchronization(new Synchronization() {
+ public void afterCommit() throws Exception {
+ doClose();
+ }
+
+ public void afterRollback() throws Exception {
+ doClose();
+ }
+ });
+ } else {
+ doClose();
+ }
+ }
+ }
+
+ void doClose() throws JMSException {
+ dispose();
+ RemoveInfo removeCommand = info.createRemoveCommand();
+ removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
+ this.session.asyncSendPacket(removeCommand);
+ }
+
+ void clearMessagesInProgress() {
+ // we are called from inside the transport reconnection logic
+ // which involves us clearing all the connections' consumers
+ // dispatch lists and clearing them
+ // so rather than trying to grab a mutex (which could be already
+ // owned by the message listener calling the send) we will just set
+ // a flag so that the list can be cleared as soon as the
+ // dispatch thread is ready to flush the dispatch list
+ clearDispatchList = true;
+ }
+
+ void deliverAcks() {
+ MessageAck ack = null;
+ if (deliveryingAcknowledgements.compareAndSet(false, true)) {
+ if (session.isAutoAcknowledge()) {
+ synchronized(deliveredMessages) {
+ ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+ if (ack != null) {
+ deliveredMessages.clear();
+ ackCounter = 0;
+ }
+ }
+ } else if (pendingAck != null && pendingAck.isStandardAck()) {
+ ack = pendingAck;
+ pendingAck = null;
+ }
+ if (ack != null) {
+ final MessageAck ackToSend = ack;
+
+ if (executorService == null) {
+ executorService = Executors.newSingleThreadExecutor();
+ }
+ executorService.submit(new Runnable() {
+ public void run() {
+ try {
+ session.sendAck(ackToSend,true);
+ } catch (JMSException e) {
+ LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
+ } finally {
+ deliveryingAcknowledgements.set(false);
+ }
+ }
+ });
+ } else {
+ deliveryingAcknowledgements.set(false);
+ }
+ }
+ }
+
+ public void dispose() throws JMSException {
+ if (!unconsumedMessages.isClosed()) {
+
+ // Do we have any acks we need to send out before closing?
+ // Ack any delivered messages now.
+ if (!session.getTransacted()) {
+ deliverAcks();
+ if (session.isDupsOkAcknowledge()) {
+ acknowledge();
+ }
+ }
+ if (executorService != null) {
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ if (session.isClientAcknowledge()) {
+ if (!this.info.isBrowser()) {
+ // rollback duplicates that aren't acknowledged
+ List<MessageDispatch> tmp = null;
+ synchronized (this.deliveredMessages) {
+ tmp = new ArrayList<MessageDispatch>(this.deliveredMessages);
+ }
+ for (MessageDispatch old : tmp) {
+ this.session.connection.rollbackDuplicate(this, old.getMessage());
+ }
+ tmp.clear();
+ }
+ }
+ if (!session.isTransacted()) {
+ synchronized(deliveredMessages) {
+ deliveredMessages.clear();
+ }
+ }
+ List<MessageDispatch> list = unconsumedMessages.removeAll();
+ if (!this.info.isBrowser()) {
+ for (MessageDispatch old : list) {
+ // ensure we don't filter this as a duplicate
+ session.connection.rollbackDuplicate(this, old.getMessage());
+ }
+ }
+ unconsumedMessages.close();
+ this.session.removeConsumer(this);
+ }
+ }
+
+ /**
+ * @throws IllegalStateException
+ */
+ protected void checkClosed() throws IllegalStateException {
+ if (unconsumedMessages.isClosed()) {
+ throw new IllegalStateException("The Consumer is closed");
+ }
+ }
+
+ /**
+ * If we have a zero prefetch specified then send a pull command to the
+ * broker to pull a message we are about to receive
+ */
+ protected void sendPullCommand(long timeout) throws JMSException {
+ if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
+ MessagePull messagePull = new MessagePull();
+ messagePull.configure(info);
+ messagePull.setTimeout(timeout);
+ session.asyncSendPacket(messagePull);
+ }
+ }
+
+ protected void checkMessageListener() throws JMSException {
+ session.checkMessageListener();
+ }
+
+ protected void setOptimizeAcknowledge(boolean value) {
+ if (optimizeAcknowledge && !value) {
+ deliverAcks();
+ }
+ optimizeAcknowledge = value;
+ }
+
+ protected void setPrefetchSize(int prefetch) {
+ deliverAcks();
+ this.info.setCurrentPrefetchSize(prefetch);
+ }
+
+ private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
+ md.setDeliverySequenceId(session.getNextDeliveryId());
+ lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
+ if (!session.isDupsOkAcknowledge()) {
+ synchronized(deliveredMessages) {
+ deliveredMessages.addFirst(md);
+ }
+ if (session.getTransacted()) {
+ ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+ }
+ }
+ }
+
+ private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
+ if (unconsumedMessages.isClosed()) {
+ return;
+ }
+ if (messageExpired) {
+ synchronized (deliveredMessages) {
+ deliveredMessages.remove(md);
+ }
+ stats.getExpiredMessageCount().increment();
+ ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+ } else {
+ stats.onMessage();
+ if (session.getTransacted()) {
+ // Do nothing.
+ } else if (session.isAutoAcknowledge()) {
+ if (deliveryingAcknowledgements.compareAndSet(false, true)) {
+ synchronized (deliveredMessages) {
+ if (!deliveredMessages.isEmpty()) {
+ if (optimizeAcknowledge) {
+ ackCounter++;
+ if (ackCounter >= (info.getCurrentPrefetchSize() * .65)) {
+ MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+ if (ack != null) {
+ deliveredMessages.clear();
+ ackCounter = 0;
+ session.sendAck(ack);
+ }
+ }
+ } else {
+ MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+ if (ack!=null) {
+ deliveredMessages.clear();
+ session.sendAck(ack);
+ }
+ }
+ }
+ }
+ deliveryingAcknowledgements.set(false);
+ }
+ } else if (session.isDupsOkAcknowledge()) {
+ ackLater(md, MessageAck.STANDARD_ACK_TYPE);
+ } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
+ ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+ }
+ else {
+ throw new IllegalStateException("Invalid session state.");
+ }
+ }
+ }
+
+ /**
+ * Creates a MessageAck for all messages contained in deliveredMessages.
+ * Caller should hold the lock for deliveredMessages.
+ *
+ * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE)
+ * @return <code>null</code> if nothing to ack.
+ */
+ private MessageAck makeAckForAllDeliveredMessages(byte type) {
+ synchronized (deliveredMessages) {
+ if (deliveredMessages.isEmpty())
+ return null;
+
+ MessageDispatch md = deliveredMessages.getFirst();
+ MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
+ ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
+ return ack;
+ }
+ }
+
+ private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
+
+ // Don't acknowledge now, but we may need to let the broker know the
+ // consumer got the message to expand the pre-fetch window
+ if (session.getTransacted()) {
+ session.doStartTransaction();
+ if (!synchronizationRegistered) {
+ synchronizationRegistered = true;
+ session.getTransactionContext().addSynchronization(new Synchronization() {
+ public void beforeEnd() throws Exception {
+ acknowledge();
+ synchronizationRegistered = false;
+ }
+
+ public void afterCommit() throws Exception {
+ commit();
+ synchronizationRegistered = false;
+ }
+
+ public void afterRollback() throws Exception {
+ rollback();
+ synchronizationRegistered = false;
+ }
+ });
+ }
+ }
+
+ deliveredCounter++;
+
+ MessageAck oldPendingAck = pendingAck;
+ pendingAck = new MessageAck(md, ackType, deliveredCounter);
+ pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
+ if( oldPendingAck==null ) {
+ pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
+ } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
+ pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
+ } else {
+ // old pending ack being superseded by ack of another type, if is is not a delivered
+ // ack and hence important, send it now so it is not lost.
+ if ( !oldPendingAck.isDeliveredAck()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+ }
+ session.sendAck(oldPendingAck);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+ }
+ }
+ }
+
+ if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
+ session.sendAck(pendingAck);
+ pendingAck=null;
+ deliveredCounter = 0;
+ additionalWindowSize = 0;
+ }
+ }
+
+ /**
+ * Acknowledge all the messages that have been delivered to the client up to
+ * this point.
+ *
+ * @throws JMSException
+ */
+ public void acknowledge() throws JMSException {
+ synchronized(deliveredMessages) {
+ // Acknowledge all messages so far.
+ MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+ if (ack == null)
+ return; // no msgs
+
+ if (session.getTransacted()) {
+ session.doStartTransaction();
+ ack.setTransactionId(session.getTransactionContext().getTransactionId());
+ }
+ session.sendAck(ack);
+ pendingAck = null;
+
+ // Adjust the counters
+ deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
+ additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
+
+ if (!session.getTransacted()) {
+ deliveredMessages.clear();
+ }
+ }
+ }
+
+ void acknowledge(MessageDispatch md) throws JMSException {
+ MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
+ session.sendAck(ack);
+ synchronized(deliveredMessages){
+ deliveredMessages.remove(md);
+ }
+ }
+
+ public void commit() throws JMSException {
+ synchronized (deliveredMessages) {
+ deliveredMessages.clear();
+ }
+ redeliveryDelay = 0;
+ }
+
+ public void rollback() throws JMSException {
+ synchronized (unconsumedMessages.getMutex()) {
+ if (optimizeAcknowledge) {
+ // remove messages read but not acked at the broker yet through
+ // optimizeAcknowledge
+ if (!this.info.isBrowser()) {
+ synchronized(deliveredMessages) {
+ for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
+ // ensure we don't filter this as a duplicate
+ MessageDispatch md = deliveredMessages.removeLast();
+ session.connection.rollbackDuplicate(this, md.getMessage());
+ }
+ }
+ }
+ }
+ synchronized(deliveredMessages) {
+ if (deliveredMessages.isEmpty()) {
+ return;
+ }
+
+ // Only increase the redelivery delay after the first redelivery..
+ MessageDispatch lastMd = deliveredMessages.getFirst();
+ final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
+ if (currentRedeliveryCount > 0) {
+ redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
+ }
+ MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
+
+ for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
+ MessageDispatch md = iter.next();
+ md.getMessage().onMessageRolledBack();
+ // ensure we don't filter this as a duplicate
+ session.connection.rollbackDuplicate(this, md.getMessage());
+ }
+
+ if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
+ && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
+ // We need to NACK the messages so that they get sent to the
+ // DLQ.
+ // Acknowledge the last message.
+
+ MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
+ ack.setFirstMessageId(firstMsgId);
+ session.sendAck(ack,true);
+ // Adjust the window size.
+ additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
+ redeliveryDelay = 0;
+ } else {
+
+ // only redelivery_ack after first delivery
+ if (currentRedeliveryCount > 0) {
+ MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
+ ack.setFirstMessageId(firstMsgId);
+ session.sendAck(ack,true);
+ }
+
+ // stop the delivery of messages.
+ unconsumedMessages.stop();
+
+ for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
+ MessageDispatch md = iter.next();
+ unconsumedMessages.enqueueFirst(md);
+ }
+
+ if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
+ // Start up the delivery again a little later.
+ scheduler.executeAfterDelay(new Runnable() {
+ public void run() {
+ try {
+ if (started.get()) {
+ start();
+ }
+ } catch (JMSException e) {
+ session.connection.onAsyncException(e);
+ }
+ }
+ }, redeliveryDelay);
+ } else {
+ start();
+ }
+
+ }
+ deliveredCounter -= deliveredMessages.size();
+ deliveredMessages.clear();
+ }
+ }
+ if (messageListener.get() != null) {
+ session.redispatch(this, unconsumedMessages);
+ }
+ }
+
+ public void dispatch(MessageDispatch md) {
+ MessageListener listener = this.messageListener.get();
+ try {
+ synchronized (unconsumedMessages.getMutex()) {
+ if (clearDispatchList) {
+ // we are reconnecting so lets flush the in progress
+ // messages
+ clearDispatchList = false;
+ List<MessageDispatch> list = unconsumedMessages.removeAll();
+ if (!this.info.isBrowser()) {
+ for (MessageDispatch old : list) {
+ // ensure we don't filter this as a duplicate
+ session.connection.rollbackDuplicate(this, old.getMessage());
+ }
+ }
+ if (pendingAck != null && pendingAck.isDeliveredAck()) {
+ // on resumption a pending delivered ack will be out of sync with
+ // re deliveries.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("removing pending delivered ack on transport interupt: " + pendingAck);
+ }
+ pendingAck = null;
+ }
+ }
+ if (!unconsumedMessages.isClosed()) {
+ if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
+ if (listener != null && unconsumedMessages.isRunning()) {
+ ActiveMQMessage message = createActiveMQMessage(md);
+ beforeMessageIsConsumed(md);
+ try {
+ boolean expired = message.isExpired();
+ if (!expired) {
+ listener.onMessage(message);
+ }
+ afterMessageIsConsumed(md, expired);
+ } catch (RuntimeException e) {
+ if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge() || session.isIndividualAcknowledge()) {
+ // Redeliver the message
+ } else {
+ // Transacted or Client ack: Deliver the
+ // next message.
+ afterMessageIsConsumed(md, false);
+ }
+ LOG.error(getConsumerId() + " Exception while processing message: " + e, e);
+ }
+ } else {
+ unconsumedMessages.enqueue(md);
+ if (availableListener != null) {
+ availableListener.onMessageAvailable(this);
+ }
+ }
+ } else {
+ // ignore duplicate
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate: " + md.getMessage());
+ }
+ // in a transaction ack delivery of duplicates to ensure prefetch extension kicks in.
+ // the normal ack will happen in the transaction.
+ if (session.isTransacted()) {
+ ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+ } else {
+ acknowledge(md);
+ }
+ }
+ }
+ }
+ if (++dispatchedCount % 1000 == 0) {
+ dispatchedCount = 0;
+ Thread.yield();
+ }
+ } catch (Exception e) {
+ session.connection.onClientInternalException(e);
+ }
+ }
+
+ public int getMessageSize() {
+ return unconsumedMessages.size();
+ }
+
+ public void start() throws JMSException {
+ if (unconsumedMessages.isClosed()) {
+ return;
+ }
+ started.set(true);
+ unconsumedMessages.start();
+ session.executor.wakeup();
+ }
+
+ public void stop() {
+ started.set(false);
+ unconsumedMessages.stop();
+ }
+
+ public String toString() {
+ return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
+ + " }";
+ }
+
+ /**
+ * Delivers a message to the message listener.
+ *
+ * @return
+ * @throws JMSException
+ */
+ public boolean iterate() {
+ MessageListener listener = this.messageListener.get();
+ if (listener != null) {
+ MessageDispatch md = unconsumedMessages.dequeueNoWait();
+ if (md != null) {
+ try {
+ ActiveMQMessage message = createActiveMQMessage(md);
+ beforeMessageIsConsumed(md);
+ listener.onMessage(message);
+ afterMessageIsConsumed(md, false);
+ } catch (JMSException e) {
+ session.connection.onClientInternalException(e);
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean isInUse(ActiveMQTempDestination destination) {
+ return info.getDestination().equals(destination);
+ }
+
+ public long getLastDeliveredSequenceId() {
+ return lastDeliveredSequenceId;
+ }
+
+ public IOException getFailureError() {
+ return failureError;
+ }
+
+ public void setFailureError(IOException failureError) {
+ this.failureError = failureError;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.management.JMSProducerStatsImpl;
+import org.apache.activemq.management.StatsCapable;
+import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.util.IntrospectionSupport;
+
+/**
+ * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
+ * destination. A <CODE>MessageProducer</CODE> object is created by passing a
+ * <CODE>Destination</CODE> object to a message-producer creation method
+ * supplied by a session.
+ * <P>
+ * <CODE>MessageProducer</CODE> is the parent interface for all message
+ * producers.
+ * <P>
+ * A client also has the option of creating a message producer without supplying
+ * a destination. In this case, a destination must be provided with every send
+ * operation. A typical use for this kind of message producer is to send replies
+ * to requests using the request's <CODE>JMSReplyTo</CODE> destination.
+ * <P>
+ * A client can specify a default delivery mode, priority, and time to live for
+ * messages sent by a message producer. It can also specify the delivery mode,
+ * priority, and time to live for an individual message.
+ * <P>
+ * A client can specify a time-to-live value in milliseconds for each message it
+ * sends. This value defines a message expiration time that is the sum of the
+ * message's time-to-live and the GMT when it is sent (for transacted sends,
+ * this is the time the client sends the message, not the time the transaction
+ * is committed).
+ * <P>
+ * A JMS provider should do its best to expire messages accurately; however, the
+ * JMS API does not define the accuracy provided.
+ *
+ * @version $Revision: 1.14 $
+ * @see javax.jms.TopicPublisher
+ * @see javax.jms.QueueSender
+ * @see javax.jms.Session#createProducer
+ */
+public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
+
+ protected ProducerInfo info;
+ protected boolean closed;
+
+ private JMSProducerStatsImpl stats;
+ private AtomicLong messageSequence;
+ private long startTime;
+ private MessageTransformer transformer;
+ private MemoryUsage producerWindow;
+
+ protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
+ super(session);
+ this.info = new ProducerInfo(producerId);
+ this.info.setWindowSize(session.connection.getProducerWindowSize());
+ if (destination != null && destination.getOptions() != null) {
+ Map<String, String> options = new HashMap<String, String>(destination.getOptions());
+ IntrospectionSupport.setProperties(this.info, options, "producer.");
+ }
+ this.info.setDestination(destination);
+
+ // Enable producer window flow control if protocol > 3 and the window
+ // size > 0
+ if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
+ producerWindow = new MemoryUsage("Producer Window: " + producerId);
+ producerWindow.setLimit(this.info.getWindowSize());
+ producerWindow.start();
+ }
+
+ this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
+ this.defaultPriority = Message.DEFAULT_PRIORITY;
+ this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
+ this.startTime = System.currentTimeMillis();
+ this.messageSequence = new AtomicLong(0);
+ this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
+ this.session.addProducer(this);
+ this.session.asyncSendPacket(info);
+ this.setSendTimeout(sendTimeout);
+ setTransformer(session.getTransformer());
+ }
+
+ public StatsImpl getStats() {
+ return stats;
+ }
+
+ public JMSProducerStatsImpl getProducerStats() {
+ return stats;
+ }
+
+ /**
+ * Gets the destination associated with this <CODE>MessageProducer</CODE>.
+ *
+ * @return this producer's <CODE>Destination/ <CODE>
+ * @throws JMSException if the JMS provider fails to close the producer due to
+ * some internal error.
+ * @since 1.1
+ */
+ public Destination getDestination() throws JMSException {
+ checkClosed();
+ return this.info.getDestination();
+ }
+
+ /**
+ * Closes the message producer.
+ * <P>
+ * Since a provider may allocate some resources on behalf of a <CODE>
+ * MessageProducer</CODE>
+ * outside the Java virtual machine, clients should close them when they are
+ * not needed. Relying on garbage collection to eventually reclaim these
+ * resources may not be timely enough.
+ *
+ * @throws JMSException if the JMS provider fails to close the producer due
+ * to some internal error.
+ */
+ public void close() throws JMSException {
+ if (!closed) {
+ dispose();
+ this.session.asyncSendPacket(info.createRemoveCommand());
+ }
+ }
+
+ public void dispose() {
+ if (!closed) {
+ this.session.removeProducer(this);
+ if (producerWindow != null) {
+ producerWindow.stop();
+ }
+ closed = true;
+ }
+ }
+
+ /**
+ * Check if the instance of this producer has been closed.
+ *
+ * @throws IllegalStateException
+ */
+ protected void checkClosed() throws IllegalStateException {
+ if (closed) {
+ throw new IllegalStateException("The producer is closed");
+ }
+ }
+
+ /**
+ * Sends a message to a destination for an unidentified message producer,
+ * specifying delivery mode, priority and time to live.
+ * <P>
+ * Typically, a message producer is assigned a destination at creation time;
+ * however, the JMS API also supports unidentified message producers, which
+ * require that the destination be supplied every time a message is sent.
+ *
+ * @param destination the destination to send this message to
+ * @param message the message to send
+ * @param deliveryMode the delivery mode to use
+ * @param priority the priority for this message
+ * @param timeToLive the message's lifetime (in milliseconds)
+ * @throws JMSException if the JMS provider fails to send the message due to
+ * some internal error.
+ * @throws UnsupportedOperationException if an invalid destination is
+ * specified.
+ * @throws InvalidDestinationException if a client uses this method with an
+ * invalid destination.
+ * @see javax.jms.Session#createProducer
+ * @since 1.1
+ */
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+ checkClosed();
+ if (destination == null) {
+ if (info.getDestination() == null) {
+ throw new UnsupportedOperationException("A destination must be specified.");
+ }
+ throw new InvalidDestinationException("Don't understand null destinations");
+ }
+
+ ActiveMQDestination dest;
+ if (destination == info.getDestination()) {
+ dest = (ActiveMQDestination)destination;
+ } else if (info.getDestination() == null) {
+ dest = ActiveMQDestination.transform(destination);
+ } else {
+ throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
+ }
+ if (dest == null) {
+ throw new JMSException("No destination specified");
+ }
+
+ if (transformer != null) {
+ Message transformedMessage = transformer.producerTransform(session, this, message);
+ if (transformedMessage != null) {
+ message = transformedMessage;
+ }
+ }
+
+ if (producerWindow != null) {
+ try {
+ producerWindow.waitForSpace();
+ } catch (InterruptedException e) {
+ throw new JMSException("Send aborted due to thread interrupt.");
+ }
+ }
+
+ this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout);
+
+ stats.onMessage();
+ }
+
+ public MessageTransformer getTransformer() {
+ return transformer;
+ }
+
+ /**
+ * Sets the transformer used to transform messages before they are sent on
+ * to the JMS bus
+ */
+ public void setTransformer(MessageTransformer transformer) {
+ this.transformer = transformer;
+ }
+
+ /**
+ * @return the time in milli second when this object was created.
+ */
+ protected long getStartTime() {
+ return this.startTime;
+ }
+
+ /**
+ * @return Returns the messageSequence.
+ */
+ protected long getMessageSequence() {
+ return messageSequence.incrementAndGet();
+ }
+
+ /**
+ * @param messageSequence The messageSequence to set.
+ */
+ protected void setMessageSequence(AtomicLong messageSequence) {
+ this.messageSequence = messageSequence;
+ }
+
+ /**
+ * @return Returns the info.
+ */
+ protected ProducerInfo getProducerInfo() {
+ return this.info != null ? this.info : null;
+ }
+
+ /**
+ * @param info The info to set
+ */
+ protected void setProducerInfo(ProducerInfo info) {
+ this.info = info;
+ }
+
+ public String toString() {
+ return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
+ }
+
+ public void onProducerAck(ProducerAck pa) {
+ if (this.producerWindow != null) {
+ this.producerWindow.decreaseUsage(pa.getSize());
+ }
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+/**
+ * A useful base class for implementing a {@link MessageProducer}
+ *
+ * @version $Revision: $
+ */
+public abstract class ActiveMQMessageProducerSupport implements MessageProducer, Closeable {
+ protected ActiveMQSession session;
+ protected boolean disableMessageID;
+ protected boolean disableMessageTimestamp;
+ protected int defaultDeliveryMode;
+ protected int defaultPriority;
+ protected long defaultTimeToLive;
+ protected int sendTimeout=0;
+
+ public ActiveMQMessageProducerSupport(ActiveMQSession session) {
+ this.session = session;
+ disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault();
+ }
+
+ /**
+ * Sets whether message IDs are disabled.
+ * <P>
+ * Since message IDs take some effort to create and increase a message's
+ * size, some JMS providers may be able to optimize message overhead if
+ * they are given a hint that the message ID is not used by an application.
+ * By calling the <CODE>setDisableMessageID</CODE> method on this message
+ * producer, a JMS client enables this potential optimization for all
+ * messages sent by this message producer. If the JMS provider accepts this
+ * hint, these messages must have the message ID set to null; if the
+ * provider ignores the hint, the message ID must be set to its normal
+ * unique value.
+ * <P>
+ * Message IDs are enabled by default.
+ *
+ * @param value indicates if message IDs are disabled
+ * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to
+ * some internal error.
+ */
+ public void setDisableMessageID(boolean value) throws JMSException {
+ checkClosed();
+ this.disableMessageID = value;
+ }
+
+ /**
+ * Gets an indication of whether message IDs are disabled.
+ *
+ * @return an indication of whether message IDs are disabled
+ * @throws javax.jms.JMSException if the JMS provider fails to determine if message IDs are
+ * disabled due to some internal error.
+ */
+ public boolean getDisableMessageID() throws JMSException {
+ checkClosed();
+ return this.disableMessageID;
+ }
+
+ /**
+ * Sets whether message timestamps are disabled.
+ * <P>
+ * Since timestamps take some effort to create and increase a message's
+ * size, some JMS providers may be able to optimize message overhead if
+ * they are given a hint that the timestamp is not used by an application.
+ * By calling the <CODE>setDisableMessageTimestamp</CODE> method on this
+ * message producer, a JMS client enables this potential optimization for
+ * all messages sent by this message producer. If the JMS provider accepts
+ * this hint, these messages must have the timestamp set to zero; if the
+ * provider ignores the hint, the timestamp must be set to its normal
+ * value.
+ * <P>
+ * Message timestamps are enabled by default.
+ *
+ * @param value indicates if message timestamps are disabled
+ * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to
+ * some internal error.
+ */
+ public void setDisableMessageTimestamp(boolean value) throws JMSException {
+ checkClosed();
+ this.disableMessageTimestamp = value;
+ }
+
+ /**
+ * Gets an indication of whether message timestamps are disabled.
+ *
+ * @return an indication of whether message timestamps are disabled
+ * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to
+ * some internal error.
+ */
+ public boolean getDisableMessageTimestamp() throws JMSException {
+ checkClosed();
+ return this.disableMessageTimestamp;
+ }
+
+ /**
+ * Sets the producer's default delivery mode.
+ * <P>
+ * Delivery mode is set to <CODE>PERSISTENT</CODE> by default.
+ *
+ * @param newDeliveryMode the message delivery mode for this message producer; legal
+ * values are <code>DeliveryMode.NON_PERSISTENT</code> and
+ * <code>DeliveryMode.PERSISTENT</code>
+ * @throws javax.jms.JMSException if the JMS provider fails to set the delivery mode due to
+ * some internal error.
+ * @see javax.jms.MessageProducer#getDeliveryMode
+ * @see javax.jms.DeliveryMode#NON_PERSISTENT
+ * @see javax.jms.DeliveryMode#PERSISTENT
+ * @see javax.jms.Message#DEFAULT_DELIVERY_MODE
+ */
+ public void setDeliveryMode(int newDeliveryMode) throws JMSException {
+ if (newDeliveryMode != DeliveryMode.PERSISTENT && newDeliveryMode != DeliveryMode.NON_PERSISTENT) {
+ throw new javax.jms.IllegalStateException("unkown delivery mode: " + newDeliveryMode);
+ }
+ checkClosed();
+ this.defaultDeliveryMode = newDeliveryMode;
+ }
+
+ /**
+ * Gets the producer's default delivery mode.
+ *
+ * @return the message delivery mode for this message producer
+ * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to
+ * some internal error.
+ */
+ public int getDeliveryMode() throws JMSException {
+ checkClosed();
+ return this.defaultDeliveryMode;
+ }
+
+ /**
+ * Sets the producer's default priority.
+ * <P>
+ * The JMS API defines ten levels of priority value, with 0 as the lowest
+ * priority and 9 as the highest. Clients should consider priorities 0-4 as
+ * gradations of normal priority and priorities 5-9 as gradations of
+ * expedited priority. Priority is set to 4 by default.
+ *
+ * @param newDefaultPriority the message priority for this message producer; must be a
+ * value between 0 and 9
+ * @throws javax.jms.JMSException if the JMS provider fails to set the delivery mode due to
+ * some internal error.
+ * @see javax.jms.MessageProducer#getPriority
+ * @see javax.jms.Message#DEFAULT_PRIORITY
+ */
+ public void setPriority(int newDefaultPriority) throws JMSException {
+ if (newDefaultPriority < 0 || newDefaultPriority > 9) {
+ throw new IllegalStateException("default priority must be a value between 0 and 9");
+ }
+ checkClosed();
+ this.defaultPriority = newDefaultPriority;
+ }
+
+ /**
+ * Gets the producer's default priority.
+ *
+ * @return the message priority for this message producer
+ * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to
+ * some internal error.
+ * @see javax.jms.MessageProducer#setPriority
+ */
+ public int getPriority() throws JMSException {
+ checkClosed();
+ return this.defaultPriority;
+ }
+
+ /**
+ * Sets the default length of time in milliseconds from its dispatch time
+ * that a produced message should be retained by the message system.
+ * <P>
+ * Time to live is set to zero by default.
+ *
+ * @param timeToLive the message time to live in milliseconds; zero is unlimited
+ * @throws javax.jms.JMSException if the JMS provider fails to set the time to live due to
+ * some internal error.
+ * @see javax.jms.MessageProducer#getTimeToLive
+ * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE
+ */
+ public void setTimeToLive(long timeToLive) throws JMSException {
+ if (timeToLive < 0L) {
+ throw new IllegalStateException("cannot set a negative timeToLive");
+ }
+ checkClosed();
+ this.defaultTimeToLive = timeToLive;
+ }
+
+ /**
+ * Gets the default length of time in milliseconds from its dispatch time
+ * that a produced message should be retained by the message system.
+ *
+ * @return the message time to live in milliseconds; zero is unlimited
+ * @throws javax.jms.JMSException if the JMS provider fails to get the time to live due to
+ * some internal error.
+ * @see javax.jms.MessageProducer#setTimeToLive
+ */
+ public long getTimeToLive() throws JMSException {
+ checkClosed();
+ return this.defaultTimeToLive;
+ }
+
+ /**
+ * Sends a message using the <CODE>MessageProducer</CODE>'s default
+ * delivery mode, priority, and time to live.
+ *
+ * @param message the message to send
+ * @throws javax.jms.JMSException if the JMS provider fails to send the message due to some
+ * internal error.
+ * @throws javax.jms.MessageFormatException if an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException if a client uses this method with a <CODE>
+ * MessageProducer</CODE> with an invalid destination.
+ * @throws UnsupportedOperationException
+ * if a client uses this method with a <CODE>
+ * MessageProducer</CODE> that did not specify a
+ * destination at creation time.
+ * @see javax.jms.Session#createProducer
+ * @see javax.jms.MessageProducer
+ * @since 1.1
+ */
+ public void send(Message message) throws JMSException {
+ this.send(this.getDestination(),
+ message,
+ this.defaultDeliveryMode,
+ this.defaultPriority,
+ this.defaultTimeToLive);
+ }
+
+ /**
+ * Sends a message to the destination, specifying delivery mode, priority,
+ * and time to live.
+ *
+ * @param message the message to send
+ * @param deliveryMode the delivery mode to use
+ * @param priority the priority for this message
+ * @param timeToLive the message's lifetime (in milliseconds)
+ * @throws javax.jms.JMSException if the JMS provider fails to send the message due to some
+ * internal error.
+ * @throws javax.jms.MessageFormatException if an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException if a client uses this method with a <CODE>
+ * MessageProducer</CODE> with an invalid destination.
+ * @throws UnsupportedOperationException
+ * if a client uses this method with a <CODE>
+ * MessageProducer</CODE> that did not specify a
+ * destination at creation time.
+ * @see javax.jms.Session#createProducer
+ * @since 1.1
+ */
+ public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+ this.send(this.getDestination(),
+ message,
+ deliveryMode,
+ priority,
+ timeToLive);
+ }
+
+ /**
+ * Sends a message to a destination for an unidentified message producer.
+ * Uses the <CODE>MessageProducer</CODE>'s default delivery mode,
+ * priority, and time to live.
+ * <P>
+ * Typically, a message producer is assigned a destination at creation
+ * time; however, the JMS API also supports unidentified message producers,
+ * which require that the destination be supplied every time a message is
+ * sent.
+ *
+ * @param destination the destination to send this message to
+ * @param message the message to send
+ * @throws javax.jms.JMSException if the JMS provider fails to send the message due to some
+ * internal error.
+ * @throws javax.jms.MessageFormatException if an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException if a client uses this method with an invalid destination.
+ * @throws UnsupportedOperationException
+ * if a client uses this method with a <CODE>
+ * MessageProducer</CODE> that specified a destination at
+ * creation time.
+ * @see javax.jms.Session#createProducer
+ * @see javax.jms.MessageProducer
+ */
+ public void send(Destination destination, Message message) throws JMSException {
+ this.send(destination,
+ message,
+ this.defaultDeliveryMode,
+ this.defaultPriority,
+ this.defaultTimeToLive);
+ }
+
+
+ protected abstract void checkClosed() throws IllegalStateException;
+
+ /**
+ * @return the sendTimeout
+ */
+ public int getSendTimeout() {
+ return sendTimeout;
+ }
+
+ /**
+ * @param sendTimeout the sendTimeout to set
+ */
+ public void setSendTimeout(int sendTimeout) {
+ this.sendTimeout = sendTimeout;
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.Enumeration;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageEOFException;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+
+/**
+ * A helper class for converting normal JMS interfaces into ActiveMQ specific
+ * ones.
+ *
+ * @version $Revision: 1.1 $
+ */
+public final class ActiveMQMessageTransformation {
+
+ private ActiveMQMessageTransformation() {
+ }
+
+ /**
+ * Creates a an available JMS message from another provider.
+ *
+ * @param destination - Destination to be converted into ActiveMQ's
+ * implementation.
+ * @return ActiveMQDestination - ActiveMQ's implementation of the
+ * destination.
+ * @throws JMSException if an error occurs
+ */
+ public static ActiveMQDestination transformDestination(Destination destination) throws JMSException {
+ ActiveMQDestination activeMQDestination = null;
+
+ if (destination != null) {
+ if (destination instanceof ActiveMQDestination) {
+ return (ActiveMQDestination)destination;
+
+ } else {
+ if (destination instanceof TemporaryQueue) {
+ activeMQDestination = new ActiveMQTempQueue(((Queue)destination).getQueueName());
+ } else if (destination instanceof TemporaryTopic) {
+ activeMQDestination = new ActiveMQTempTopic(((Topic)destination).getTopicName());
+ } else if (destination instanceof Queue) {
+ activeMQDestination = new ActiveMQQueue(((Queue)destination).getQueueName());
+ } else if (destination instanceof Topic) {
+ activeMQDestination = new ActiveMQTopic(((Topic)destination).getTopicName());
+ }
+ }
+ }
+
+ return activeMQDestination;
+ }
+
+ /**
+ * Creates a fast shallow copy of the current ActiveMQMessage or creates a
+ * whole new message instance from an available JMS message from another
+ * provider.
+ *
+ * @param message - Message to be converted into ActiveMQ's implementation.
+ * @param connection
+ * @return ActiveMQMessage - ActiveMQ's implementation object of the
+ * message.
+ * @throws JMSException if an error occurs
+ */
+ public static ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection)
+ throws JMSException {
+ if (message instanceof ActiveMQMessage) {
+ return (ActiveMQMessage)message;
+
+ } else {
+ ActiveMQMessage activeMessage = null;
+
+ if (message instanceof BytesMessage) {
+ BytesMessage bytesMsg = (BytesMessage)message;
+ bytesMsg.reset();
+ ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
+ msg.setConnection(connection);
+ try {
+ for (;;) {
+ // Reads a byte from the message stream until the stream
+ // is empty
+ msg.writeByte(bytesMsg.readByte());
+ }
+ } catch (MessageEOFException e) {
+ // if an end of message stream as expected
+ } catch (JMSException e) {
+ }
+
+ activeMessage = msg;
+ } else if (message instanceof MapMessage) {
+ MapMessage mapMsg = (MapMessage)message;
+ ActiveMQMapMessage msg = new ActiveMQMapMessage();
+ msg.setConnection(connection);
+ Enumeration iter = mapMsg.getMapNames();
+
+ while (iter.hasMoreElements()) {
+ String name = iter.nextElement().toString();
+ msg.setObject(name, mapMsg.getObject(name));
+ }
+
+ activeMessage = msg;
+ } else if (message instanceof ObjectMessage) {
+ ObjectMessage objMsg = (ObjectMessage)message;
+ ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
+ msg.setConnection(connection);
+ msg.setObject(objMsg.getObject());
+ msg.storeContent();
+ activeMessage = msg;
+ } else if (message instanceof StreamMessage) {
+ StreamMessage streamMessage = (StreamMessage)message;
+ streamMessage.reset();
+ ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
+ msg.setConnection(connection);
+ Object obj = null;
+
+ try {
+ while ((obj = streamMessage.readObject()) != null) {
+ msg.writeObject(obj);
+ }
+ } catch (MessageEOFException e) {
+ // if an end of message stream as expected
+ } catch (JMSException e) {
+ }
+
+ activeMessage = msg;
+ } else if (message instanceof TextMessage) {
+ TextMessage textMsg = (TextMessage)message;
+ ActiveMQTextMessage msg = new ActiveMQTextMessage();
+ msg.setConnection(connection);
+ msg.setText(textMsg.getText());
+ activeMessage = msg;
+ } else {
+ activeMessage = new ActiveMQMessage();
+ activeMessage.setConnection(connection);
+ }
+
+ copyProperties(message, activeMessage);
+
+ return activeMessage;
+ }
+ }
+
+ /**
+ * Copies the standard JMS and user defined properties from the givem
+ * message to the specified message
+ *
+ * @param fromMessage the message to take the properties from
+ * @param toMessage the message to add the properties to
+ * @throws JMSException
+ */
+ public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException {
+ toMessage.setJMSMessageID(fromMessage.getJMSMessageID());
+ toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID());
+ toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
+ toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination()));
+ toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode());
+ toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered());
+ toMessage.setJMSType(fromMessage.getJMSType());
+ toMessage.setJMSExpiration(fromMessage.getJMSExpiration());
+ toMessage.setJMSPriority(fromMessage.getJMSPriority());
+ toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp());
+
+ Enumeration propertyNames = fromMessage.getPropertyNames();
+
+ while (propertyNames.hasMoreElements()) {
+ String name = propertyNames.nextElement().toString();
+ Object obj = fromMessage.getObjectProperty(name);
+ toMessage.setObjectProperty(name, obj);
+ }
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
------------------------------------------------------------------------------
svn:executable = *