You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/02 23:29:35 UTC

svn commit: r781177 [5/11] - in /activemq/sandbox/activemq-flow: activemq-bio/ activemq-bio/src/main/java/org/ activemq-bio/src/main/java/org/apache/ activemq-bio/src/main/java/org/apache/activemq/ activemq-bio/src/main/java/org/apache/activemq/transpo...

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,1184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.filter.FilterException;
+import org.apache.activemq.management.JMSConsumerStatsImpl;
+import org.apache.activemq.management.StatsCapable;
+import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.Callback;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
+ * from a destination. A <CODE> MessageConsumer</CODE> object is created by
+ * passing a <CODE>Destination</CODE> object to a message-consumer creation
+ * method supplied by a session.
+ * <P>
+ * <CODE>MessageConsumer</CODE> is the parent interface for all message
+ * consumers.
+ * <P>
+ * A message consumer can be created with a message selector. A message selector
+ * allows the client to restrict the messages delivered to the message consumer
+ * to those that match the selector.
+ * <P>
+ * A client may either synchronously receive a message consumer's messages or
+ * have the consumer asynchronously deliver them as they arrive.
+ * <P>
+ * For synchronous receipt, a client can request the next message from a message
+ * consumer using one of its <CODE> receive</CODE> methods. There are several
+ * variations of <CODE>receive</CODE> that allow a client to poll or wait for
+ * the next message.
+ * <P>
+ * For asynchronous delivery, a client can register a
+ * <CODE>MessageListener</CODE> object with a message consumer. As messages
+ * arrive at the message consumer, it delivers them by calling the
+ * <CODE>MessageListener</CODE>'s<CODE>
+ * onMessage</CODE> method.
+ * <P>
+ * It is a client programming error for a <CODE>MessageListener</CODE> to
+ * throw an exception.
+ * 
+ * @version $Revision: 1.22 $
+ * @see javax.jms.MessageConsumer
+ * @see javax.jms.QueueReceiver
+ * @see javax.jms.TopicSubscriber
+ * @see javax.jms.Session
+ */
+public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
+
+    private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class);
+    protected static final Scheduler scheduler = Scheduler.getInstance();
+    protected final ActiveMQSession session;
+    protected final ConsumerInfo info;
+
+    // These are the messages waiting to be delivered to the client
+    private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
+
+    // The are the messages that were delivered to the consumer but that have
+    // not been acknowledged. It's kept in reverse order since we
+    // Always walk list in reverse order.
+    private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
+    private int deliveredCounter;
+    private int additionalWindowSize;
+    private long redeliveryDelay;
+    private int ackCounter;
+    private int dispatchedCount;
+    private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
+    private JMSConsumerStatsImpl stats;
+
+    private final String selector;
+    private boolean synchronizationRegistered;
+    private AtomicBoolean started = new AtomicBoolean(false);
+
+    private MessageAvailableListener availableListener;
+
+    private RedeliveryPolicy redeliveryPolicy;
+    private boolean optimizeAcknowledge;
+    private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
+    private ExecutorService executorService;
+    private MessageTransformer transformer;
+    private boolean clearDispatchList;
+
+    private MessageAck pendingAck;
+    private long lastDeliveredSequenceId;
+
+    private IOException failureError;
+
+    /**
+     * Create a MessageConsumer
+     * 
+     * @param session
+     * @param dest
+     * @param name
+     * @param selector
+     * @param prefetch
+     * @param maximumPendingMessageCount TODO
+     * @param noLocal
+     * @param browser
+     * @param dispatchAsync
+     * @param messageListener
+     * @throws JMSException
+     */
+    public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
+            String name, String selector, int prefetch,
+            int maximumPendingMessageCount, boolean noLocal, boolean browser,
+            boolean dispatchAsync, MessageListener messageListener) throws JMSException {
+        if (dest == null) {
+            throw new InvalidDestinationException("Don't understand null destinations");
+        } else if (dest.getPhysicalName() == null) {
+            throw new InvalidDestinationException("The destination object was not given a physical name.");
+        } else if (dest.isTemporary()) {
+            String physicalName = dest.getPhysicalName();
+
+            if (physicalName == null) {
+                throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
+            }
+
+            String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
+
+            if (physicalName.indexOf(connectionID) < 0) {
+                throw new InvalidDestinationException(
+                                                      "Cannot use a Temporary destination from another Connection");
+            }
+
+            if (session.connection.isDeleted(dest)) {
+                throw new InvalidDestinationException(
+                                                      "Cannot use a Temporary destination that has been deleted");
+            }
+            if (prefetch < 0) {
+                throw new JMSException("Cannot have a prefetch size less than zero");
+            }
+        }
+
+        this.session = session;
+        this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
+        setTransformer(session.getTransformer());
+
+        this.info = new ConsumerInfo(consumerId);
+        this.info.setExclusive(this.session.connection.isExclusiveConsumer());
+        this.info.setSubscriptionName(name);
+        this.info.setPrefetchSize(prefetch);
+        this.info.setCurrentPrefetchSize(prefetch);
+        this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
+        this.info.setNoLocal(noLocal);
+        this.info.setDispatchAsync(dispatchAsync);
+        this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
+        this.info.setSelector(null);
+
+        // Allows the options on the destination to configure the consumerInfo
+        if (dest.getOptions() != null) {
+            Map<String, String> options = new HashMap<String, String>(dest.getOptions());
+            IntrospectionSupport.setProperties(this.info, options, "consumer.");
+        }
+
+        this.info.setDestination(dest);
+        this.info.setBrowser(browser);
+        try {
+            if (selector != null && selector.trim().length() != 0) {
+                // Validate the selector
+                SelectorParser.parse(selector);
+                this.info.setSelector(selector);
+                this.selector = selector;
+            } else if (info.getSelector() != null) {
+                // Validate the selector
+                SelectorParser.parse(this.info.getSelector());
+                this.selector = this.info.getSelector();
+            } else {
+                this.selector = null;
+            }
+        } catch (FilterException e) {
+            throw JMSExceptionSupport.createInvalidSelectorException(e);
+        }
+
+        this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
+        this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
+                                   && !info.isBrowser();
+        this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
+
+        if (messageListener != null) {
+            setMessageListener(messageListener);
+        }
+        try {
+            this.session.addConsumer(this);
+            this.session.syncSendPacket(info);
+        } catch (JMSException e) {
+            this.session.removeConsumer(this);
+            throw e;
+        }
+
+        if (session.connection.isStarted()) {
+            start();
+        }
+    }
+
+    public StatsImpl getStats() {
+        return stats;
+    }
+
+    public JMSConsumerStatsImpl getConsumerStats() {
+        return stats;
+    }
+
+    public RedeliveryPolicy getRedeliveryPolicy() {
+        return redeliveryPolicy;
+    }
+
+    /**
+     * Sets the redelivery policy used when messages are redelivered
+     */
+    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
+        this.redeliveryPolicy = redeliveryPolicy;
+    }
+
+    public MessageTransformer getTransformer() {
+        return transformer;
+    }
+
+    /**
+     * Sets the transformer used to transform messages before they are sent on
+     * to the JMS bus
+     */
+    public void setTransformer(MessageTransformer transformer) {
+        this.transformer = transformer;
+    }
+
+    /**
+     * @return Returns the value.
+     */
+    public ConsumerId getConsumerId() {
+        return info.getConsumerId();
+    }
+
+    /**
+     * @return the consumer name - used for durable consumers
+     */
+    public String getConsumerName() {
+        return this.info.getSubscriptionName();
+    }
+
+    /**
+     * @return true if this consumer does not accept locally produced messages
+     */
+    protected boolean isNoLocal() {
+        return info.isNoLocal();
+    }
+
+    /**
+     * Retrieve is a browser
+     * 
+     * @return true if a browser
+     */
+    protected boolean isBrowser() {
+        return info.isBrowser();
+    }
+
+    /**
+     * @return ActiveMQDestination
+     */
+    protected ActiveMQDestination getDestination() {
+        return info.getDestination();
+    }
+
+    /**
+     * @return Returns the prefetchNumber.
+     */
+    public int getPrefetchNumber() {
+        return info.getPrefetchSize();
+    }
+
+    /**
+     * @return true if this is a durable topic subscriber
+     */
+    public boolean isDurableSubscriber() {
+        return info.getSubscriptionName() != null && info.getDestination().isTopic();
+    }
+
+    /**
+     * Gets this message consumer's message selector expression.
+     * 
+     * @return this message consumer's message selector, or null if no message
+     *         selector exists for the message consumer (that is, if the message
+     *         selector was not set or was set to null or the empty string)
+     * @throws JMSException if the JMS provider fails to receive the next
+     *                 message due to some internal error.
+     */
+    public String getMessageSelector() throws JMSException {
+        checkClosed();
+        return selector;
+    }
+
+    /**
+     * Gets the message consumer's <CODE>MessageListener</CODE>.
+     * 
+     * @return the listener for the message consumer, or null if no listener is
+     *         set
+     * @throws JMSException if the JMS provider fails to get the message
+     *                 listener due to some internal error.
+     * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
+     */
+    public MessageListener getMessageListener() throws JMSException {
+        checkClosed();
+        return this.messageListener.get();
+    }
+
+    /**
+     * Sets the message consumer's <CODE>MessageListener</CODE>.
+     * <P>
+     * Setting the message listener to null is the equivalent of unsetting the
+     * message listener for the message consumer.
+     * <P>
+     * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
+     * while messages are being consumed by an existing listener or the consumer
+     * is being used to consume messages synchronously is undefined.
+     * 
+     * @param listener the listener to which the messages are to be delivered
+     * @throws JMSException if the JMS provider fails to receive the next
+     *                 message due to some internal error.
+     * @see javax.jms.MessageConsumer#getMessageListener
+     */
+    public void setMessageListener(MessageListener listener) throws JMSException {
+        checkClosed();
+        if (info.getPrefetchSize() == 0) {
+            throw new JMSException(
+                                   "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
+        }
+        if (listener != null) {
+            boolean wasRunning = session.isRunning();
+            if (wasRunning) {
+                session.stop();
+            }
+
+            this.messageListener.set(listener);
+            session.redispatch(this, unconsumedMessages);
+
+            if (wasRunning) {
+                session.start();
+            }
+        } else {
+            this.messageListener.set(null);
+        }
+    }
+
+    public MessageAvailableListener getAvailableListener() {
+        return availableListener;
+    }
+
+    /**
+     * Sets the listener used to notify synchronous consumers that there is a
+     * message available so that the {@link MessageConsumer#receiveNoWait()} can
+     * be called.
+     */
+    public void setAvailableListener(MessageAvailableListener availableListener) {
+        this.availableListener = availableListener;
+    }
+
+    /**
+     * Used to get an enqueued message from the unconsumedMessages list. The
+     * amount of time this method blocks is based on the timeout value. - if
+     * timeout==-1 then it blocks until a message is received. - if timeout==0
+     * then it it tries to not block at all, it returns a message if it is
+     * available - if timeout>0 then it blocks up to timeout amount of time.
+     * Expired messages will consumed by this method.
+     * 
+     * @throws JMSException
+     * @return null if we timeout or if the consumer is closed.
+     */
+    private MessageDispatch dequeue(long timeout) throws JMSException {
+        try {
+            long deadline = 0;
+            if (timeout > 0) {
+                deadline = System.currentTimeMillis() + timeout;
+            }
+            while (true) {
+                MessageDispatch md = unconsumedMessages.dequeue(timeout);
+                if (md == null) {
+                    if (timeout > 0 && !unconsumedMessages.isClosed()) {
+                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+                    } else {
+                    	if (failureError != null) {
+                    		throw JMSExceptionSupport.create(failureError);
+                    	} else {
+                    		return null;
+                    	}
+                    }
+                } else if (md.getMessage() == null) {
+                    return null;
+                } else if (md.getMessage().isExpired()) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(getConsumerId() + " received expired message: " + md);
+                    }
+                    beforeMessageIsConsumed(md);
+                    afterMessageIsConsumed(md, true);
+                    if (timeout > 0) {
+                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+                    }
+                } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace(getConsumerId() + " received message: " + md);
+                    }
+                    return md;
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw JMSExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * Receives the next message produced for this message consumer.
+     * <P>
+     * This call blocks indefinitely until a message is produced or until this
+     * message consumer is closed.
+     * <P>
+     * If this <CODE>receive</CODE> is done within a transaction, the consumer
+     * retains the message until the transaction commits.
+     * 
+     * @return the next message produced for this message consumer, or null if
+     *         this message consumer is concurrently closed
+     */
+    public Message receive() throws JMSException {
+        checkClosed();
+        checkMessageListener();
+
+        sendPullCommand(0);
+        MessageDispatch md = dequeue(-1);
+        if (md == null) {
+            return null;
+        }
+
+        beforeMessageIsConsumed(md);
+        afterMessageIsConsumed(md, false);
+
+        return createActiveMQMessage(md);
+    }
+
+    /**
+     * @param md
+     * @return
+     */
+    private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
+        ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
+        if (transformer != null) {
+            Message transformedMessage = transformer.consumerTransform(session, this, m);
+            if (transformedMessage != null) {
+                m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
+            }
+        }
+        if (session.isClientAcknowledge()) {
+            m.setAcknowledgeCallback(new Callback() {
+                public void execute() throws Exception {
+                    session.checkClosed();
+                    session.acknowledge();
+                }
+            });
+        }else if (session.isIndividualAcknowledge()) {
+            m.setAcknowledgeCallback(new Callback() {
+                public void execute() throws Exception {
+                    session.checkClosed();
+                    acknowledge(md);
+                }
+            });
+        }
+        return m;
+    }
+
+    /**
+     * Receives the next message that arrives within the specified timeout
+     * interval.
+     * <P>
+     * This call blocks until a message arrives, the timeout expires, or this
+     * message consumer is closed. A <CODE>timeout</CODE> of zero never
+     * expires, and the call blocks indefinitely.
+     * 
+     * @param timeout the timeout value (in milliseconds), a time out of zero
+     *                never expires.
+     * @return the next message produced for this message consumer, or null if
+     *         the timeout expires or this message consumer is concurrently
+     *         closed
+     */
+    public Message receive(long timeout) throws JMSException {
+        checkClosed();
+        checkMessageListener();
+        if (timeout == 0) {
+            return this.receive();
+
+        }
+
+        sendPullCommand(timeout);
+        while (timeout > 0) {
+
+            MessageDispatch md;
+            if (info.getPrefetchSize() == 0) {
+                md = dequeue(-1); // We let the broker let us know when we
+                // timeout.
+            } else {
+                md = dequeue(timeout);
+            }
+
+            if (md == null) {
+                return null;
+            }
+
+            beforeMessageIsConsumed(md);
+            afterMessageIsConsumed(md, false);
+            return createActiveMQMessage(md);
+        }
+        return null;
+    }
+
+    /**
+     * Receives the next message if one is immediately available.
+     * 
+     * @return the next message produced for this message consumer, or null if
+     *         one is not available
+     * @throws JMSException if the JMS provider fails to receive the next
+     *                 message due to some internal error.
+     */
+    public Message receiveNoWait() throws JMSException {
+        checkClosed();
+        checkMessageListener();
+        sendPullCommand(-1);
+
+        MessageDispatch md;
+        if (info.getPrefetchSize() == 0) {
+            md = dequeue(-1); // We let the broker let us know when we
+            // timeout.
+        } else {
+            md = dequeue(0);
+        }
+
+        if (md == null) {
+            return null;
+        }
+
+        beforeMessageIsConsumed(md);
+        afterMessageIsConsumed(md, false);
+        return createActiveMQMessage(md);
+    }
+
+    /**
+     * Closes the message consumer.
+     * <P>
+     * Since a provider may allocate some resources on behalf of a <CODE>
+     * MessageConsumer</CODE>
+     * outside the Java virtual machine, clients should close them when they are
+     * not needed. Relying on garbage collection to eventually reclaim these
+     * resources may not be timely enough.
+     * <P>
+     * This call blocks until a <CODE>receive</CODE> or message listener in
+     * progress has completed. A blocked message consumer <CODE>receive </CODE>
+     * call returns null when this message consumer is closed.
+     * 
+     * @throws JMSException if the JMS provider fails to close the consumer due
+     *                 to some internal error.
+     */
+    public void close() throws JMSException {
+        if (!unconsumedMessages.isClosed()) {
+            if (session.getTransactionContext().isInTransaction()) {
+                session.getTransactionContext().addSynchronization(new Synchronization() {
+                    public void afterCommit() throws Exception {
+                        doClose();
+                    }
+
+                    public void afterRollback() throws Exception {
+                        doClose();
+                    }
+                });
+            } else {
+                doClose();
+            } 
+        }
+    }
+
+    void doClose() throws JMSException {
+        dispose();
+        RemoveInfo removeCommand = info.createRemoveCommand();
+        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
+        this.session.asyncSendPacket(removeCommand);
+    }
+    
+    void clearMessagesInProgress() {
+        // we are called from inside the transport reconnection logic
+        // which involves us clearing all the connections' consumers
+        // dispatch lists and clearing them
+        // so rather than trying to grab a mutex (which could be already
+        // owned by the message listener calling the send) we will just set
+        // a flag so that the list can be cleared as soon as the
+        // dispatch thread is ready to flush the dispatch list
+        clearDispatchList = true;
+    }
+
+    void deliverAcks() {
+        MessageAck ack = null;
+        if (deliveryingAcknowledgements.compareAndSet(false, true)) {
+            if (session.isAutoAcknowledge()) {
+                synchronized(deliveredMessages) {
+                    ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+                    if (ack != null) {
+                        deliveredMessages.clear();
+                        ackCounter = 0;
+            		}
+            	}
+            } else if (pendingAck != null && pendingAck.isStandardAck()) {
+                ack = pendingAck;
+                pendingAck = null;
+            }
+            if (ack != null) {
+                final MessageAck ackToSend = ack;
+                
+                if (executorService == null) {
+                    executorService = Executors.newSingleThreadExecutor();
+                }
+                executorService.submit(new Runnable() {
+                    public void run() {
+                        try {
+                            session.sendAck(ackToSend,true);
+                        } catch (JMSException e) {
+                            LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
+                        } finally {
+                            deliveryingAcknowledgements.set(false);
+                        }
+                    }
+                });
+            } else {
+                deliveryingAcknowledgements.set(false);
+            }
+        }
+    }
+
+    public void dispose() throws JMSException {
+        if (!unconsumedMessages.isClosed()) {
+            
+            // Do we have any acks we need to send out before closing?
+            // Ack any delivered messages now.
+            if (!session.getTransacted()) { 
+                deliverAcks();
+                if (session.isDupsOkAcknowledge()) {
+                    acknowledge();
+                }
+            }
+            if (executorService != null) {
+                executorService.shutdown();
+                try {
+                    executorService.awaitTermination(60, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+            
+            if (session.isClientAcknowledge()) {
+                if (!this.info.isBrowser()) {
+                    // rollback duplicates that aren't acknowledged
+                    List<MessageDispatch> tmp = null;
+                    synchronized (this.deliveredMessages) {
+                        tmp = new ArrayList<MessageDispatch>(this.deliveredMessages);
+                    }
+                    for (MessageDispatch old : tmp) {
+                        this.session.connection.rollbackDuplicate(this, old.getMessage());
+                    }
+                    tmp.clear();
+                }
+            }
+            if (!session.isTransacted()) {
+                synchronized(deliveredMessages) {
+                    deliveredMessages.clear();
+                }
+            }
+            List<MessageDispatch> list = unconsumedMessages.removeAll();
+            if (!this.info.isBrowser()) {
+                for (MessageDispatch old : list) {
+                    // ensure we don't filter this as a duplicate
+                    session.connection.rollbackDuplicate(this, old.getMessage());
+                }
+            }
+            unconsumedMessages.close();
+            this.session.removeConsumer(this);
+        }
+    }
+
+    /**
+     * @throws IllegalStateException
+     */
+    protected void checkClosed() throws IllegalStateException {
+        if (unconsumedMessages.isClosed()) {
+            throw new IllegalStateException("The Consumer is closed");
+        }
+    }
+
+    /**
+     * If we have a zero prefetch specified then send a pull command to the
+     * broker to pull a message we are about to receive
+     */
+    protected void sendPullCommand(long timeout) throws JMSException {
+        if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
+            MessagePull messagePull = new MessagePull();
+            messagePull.configure(info);
+            messagePull.setTimeout(timeout);
+            session.asyncSendPacket(messagePull);
+        }
+    }
+
+    protected void checkMessageListener() throws JMSException {
+        session.checkMessageListener();
+    }
+
+    protected void setOptimizeAcknowledge(boolean value) {
+        if (optimizeAcknowledge && !value) {
+            deliverAcks();
+        }
+        optimizeAcknowledge = value;
+    }
+
+    protected void setPrefetchSize(int prefetch) {
+        deliverAcks();
+        this.info.setCurrentPrefetchSize(prefetch);
+    }
+
+    private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
+        md.setDeliverySequenceId(session.getNextDeliveryId());
+        lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
+        if (!session.isDupsOkAcknowledge()) {
+            synchronized(deliveredMessages) {
+                deliveredMessages.addFirst(md);
+            }
+            if (session.getTransacted()) {
+                ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+            }
+        }
+    }
+
+    private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
+        if (unconsumedMessages.isClosed()) {
+            return;
+        }
+        if (messageExpired) {
+            synchronized (deliveredMessages) {
+                deliveredMessages.remove(md);
+            }
+            stats.getExpiredMessageCount().increment();
+            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+        } else {
+            stats.onMessage();
+            if (session.getTransacted()) {
+                // Do nothing.
+            } else if (session.isAutoAcknowledge()) {
+                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
+                    synchronized (deliveredMessages) {
+                        if (!deliveredMessages.isEmpty()) {
+                            if (optimizeAcknowledge) {
+                                ackCounter++;
+                                if (ackCounter >= (info.getCurrentPrefetchSize() * .65)) {
+                                	MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+                                	if (ack != null) {
+                            		    deliveredMessages.clear();
+                            		    ackCounter = 0;
+                            		    session.sendAck(ack);
+                                	}
+                                }
+                            } else {
+                                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+                                if (ack!=null) {
+                                    deliveredMessages.clear();
+                                    session.sendAck(ack);
+                                }
+                            }
+                        }
+                    }
+                    deliveryingAcknowledgements.set(false);
+                }
+            } else if (session.isDupsOkAcknowledge()) {
+                ackLater(md, MessageAck.STANDARD_ACK_TYPE);
+            } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
+                ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+            } 
+            else {
+                throw new IllegalStateException("Invalid session state.");
+            }
+        }
+    }
+
+    /**
+     * Creates a MessageAck for all messages contained in deliveredMessages.
+     * Caller should hold the lock for deliveredMessages.
+     * 
+     * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE) 
+     * @return <code>null</code> if nothing to ack.
+     */
+	private MessageAck makeAckForAllDeliveredMessages(byte type) {
+		synchronized (deliveredMessages) {
+			if (deliveredMessages.isEmpty())
+				return null;
+			    
+			MessageDispatch md = deliveredMessages.getFirst();
+		    MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
+		    ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
+		    return ack;
+		}
+	}
+
+    private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
+
+        // Don't acknowledge now, but we may need to let the broker know the
+        // consumer got the message to expand the pre-fetch window
+        if (session.getTransacted()) {
+            session.doStartTransaction();
+            if (!synchronizationRegistered) {
+                synchronizationRegistered = true;
+                session.getTransactionContext().addSynchronization(new Synchronization() {
+                    public void beforeEnd() throws Exception {
+                        acknowledge();
+                        synchronizationRegistered = false;
+                    }
+
+                    public void afterCommit() throws Exception {
+                        commit();
+                        synchronizationRegistered = false;
+                    }
+
+                    public void afterRollback() throws Exception {
+                        rollback();
+                        synchronizationRegistered = false;
+                    }
+                });
+            }
+        }
+
+        deliveredCounter++;
+        
+        MessageAck oldPendingAck = pendingAck;
+        pendingAck = new MessageAck(md, ackType, deliveredCounter);
+        pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
+        if( oldPendingAck==null ) {
+            pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
+        } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
+            pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
+        } else {
+            // old pending ack being superseded by ack of another type, if is is not a delivered
+            // ack and hence important, send it now so it is not lost.
+            if ( !oldPendingAck.isDeliveredAck()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+                }
+                session.sendAck(oldPendingAck);
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+                }
+            }
+        }
+        
+        if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
+            session.sendAck(pendingAck);
+            pendingAck=null;
+            deliveredCounter = 0;
+            additionalWindowSize = 0;
+        }
+    }
+
+    /**
+     * Acknowledge all the messages that have been delivered to the client up to
+     * this point.
+     * 
+     * @throws JMSException
+     */
+    public void acknowledge() throws JMSException {
+        synchronized(deliveredMessages) {
+            // Acknowledge all messages so far.
+            MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+            if (ack == null)
+            	return; // no msgs
+            
+            if (session.getTransacted()) {
+                session.doStartTransaction();
+                ack.setTransactionId(session.getTransactionContext().getTransactionId());
+            }
+            session.sendAck(ack);
+            pendingAck = null;
+            
+            // Adjust the counters
+            deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
+            additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
+            
+            if (!session.getTransacted()) {  
+                deliveredMessages.clear();
+            } 
+        }
+    }
+    
+    void acknowledge(MessageDispatch md) throws JMSException {
+        MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
+        session.sendAck(ack);
+        synchronized(deliveredMessages){
+            deliveredMessages.remove(md);
+        }
+    }
+
+    public void commit() throws JMSException {
+        synchronized (deliveredMessages) {
+            deliveredMessages.clear();
+        }
+        redeliveryDelay = 0;
+    }
+
+    public void rollback() throws JMSException {
+        synchronized (unconsumedMessages.getMutex()) {
+            if (optimizeAcknowledge) {
+                // remove messages read but not acked at the broker yet through
+                // optimizeAcknowledge
+                if (!this.info.isBrowser()) {
+                    synchronized(deliveredMessages) {
+                        for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
+                            // ensure we don't filter this as a duplicate
+                            MessageDispatch md = deliveredMessages.removeLast();
+                            session.connection.rollbackDuplicate(this, md.getMessage());
+                        }
+                    }
+                }
+            }
+            synchronized(deliveredMessages) {
+                if (deliveredMessages.isEmpty()) {
+                    return;
+                }
+    
+                // Only increase the redelivery delay after the first redelivery..
+                MessageDispatch lastMd = deliveredMessages.getFirst();
+                final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
+                if (currentRedeliveryCount > 0) {
+                    redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
+                }
+                MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
+    
+                for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
+                    MessageDispatch md = iter.next();
+                    md.getMessage().onMessageRolledBack();
+                    // ensure we don't filter this as a duplicate
+                    session.connection.rollbackDuplicate(this, md.getMessage());
+                }
+    
+                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
+                    && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
+                    // We need to NACK the messages so that they get sent to the
+                    // DLQ.
+                    // Acknowledge the last message.
+                    
+                    MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
+					ack.setFirstMessageId(firstMsgId);
+                    session.sendAck(ack,true);
+                    // Adjust the window size.
+                    additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
+                    redeliveryDelay = 0;
+                } else {
+                    
+                    // only redelivery_ack after first delivery
+                    if (currentRedeliveryCount > 0) {
+                        MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
+                        ack.setFirstMessageId(firstMsgId);
+                        session.sendAck(ack,true);
+                    }
+    
+                    // stop the delivery of messages.
+                    unconsumedMessages.stop();
+    
+                    for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
+                        MessageDispatch md = iter.next();
+                        unconsumedMessages.enqueueFirst(md);
+                    }
+    
+                    if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
+                        // Start up the delivery again a little later.
+                        scheduler.executeAfterDelay(new Runnable() {
+                            public void run() {
+                                try {
+                                    if (started.get()) {
+                                        start();
+                                    }
+                                } catch (JMSException e) {
+                                    session.connection.onAsyncException(e);
+                                }
+                            }
+                        }, redeliveryDelay);
+                    } else {
+                        start();
+                    }
+    
+                }
+                deliveredCounter -= deliveredMessages.size();
+                deliveredMessages.clear();
+            }
+        }
+        if (messageListener.get() != null) {
+            session.redispatch(this, unconsumedMessages);
+        }
+    }
+
+    public void dispatch(MessageDispatch md) {
+        MessageListener listener = this.messageListener.get();
+        try {
+            synchronized (unconsumedMessages.getMutex()) {
+                if (clearDispatchList) {
+                    // we are reconnecting so lets flush the in progress
+                    // messages
+                    clearDispatchList = false;
+                    List<MessageDispatch> list = unconsumedMessages.removeAll();
+                    if (!this.info.isBrowser()) {
+                        for (MessageDispatch old : list) {
+                            // ensure we don't filter this as a duplicate
+                            session.connection.rollbackDuplicate(this, old.getMessage());
+                        }
+                    }
+                    if (pendingAck != null && pendingAck.isDeliveredAck()) {
+                        // on resumption a pending delivered ack will be out of sync with
+                        // re deliveries.
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("removing pending delivered ack on transport interupt: " + pendingAck);
+                        }   
+                        pendingAck = null;
+                    }
+                }
+                if (!unconsumedMessages.isClosed()) {
+                    if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
+                        if (listener != null && unconsumedMessages.isRunning()) {
+                            ActiveMQMessage message = createActiveMQMessage(md);
+                            beforeMessageIsConsumed(md);
+                            try {
+                                boolean expired = message.isExpired();
+                                if (!expired) {
+                                    listener.onMessage(message);
+                                }
+                                afterMessageIsConsumed(md, expired);
+                            } catch (RuntimeException e) {
+                                if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge() || session.isIndividualAcknowledge()) {
+                                    // Redeliver the message
+                                } else {
+                                    // Transacted or Client ack: Deliver the
+                                    // next message.
+                                    afterMessageIsConsumed(md, false);
+                                }
+                                LOG.error(getConsumerId() + " Exception while processing message: " + e, e);
+                            }
+                        } else {
+                            unconsumedMessages.enqueue(md);
+                            if (availableListener != null) {
+                                availableListener.onMessageAvailable(this);
+                            }
+                        }
+                    } else {
+                        // ignore duplicate
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate: " + md.getMessage());
+                        }
+                        // in a transaction ack delivery of duplicates to ensure prefetch extension kicks in.
+                        // the normal ack will happen in the transaction.
+                        if (session.isTransacted()) {
+                            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+                        } else {
+                            acknowledge(md);
+                        }
+                    }
+                }
+            }
+            if (++dispatchedCount % 1000 == 0) {
+                dispatchedCount = 0;
+                Thread.yield();
+            }
+        } catch (Exception e) {
+            session.connection.onClientInternalException(e);
+        }
+    }
+
+    public int getMessageSize() {
+        return unconsumedMessages.size();
+    }
+
+    public void start() throws JMSException {
+        if (unconsumedMessages.isClosed()) {
+            return;
+        }
+        started.set(true);
+        unconsumedMessages.start();
+        session.executor.wakeup();
+    }
+
+    public void stop() {
+        started.set(false);
+        unconsumedMessages.stop();
+    }
+
+    public String toString() {
+        return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
+               + " }";
+    }
+
+    /**
+     * Delivers a message to the message listener.
+     * 
+     * @return
+     * @throws JMSException
+     */
+    public boolean iterate() {
+        MessageListener listener = this.messageListener.get();
+        if (listener != null) {
+            MessageDispatch md = unconsumedMessages.dequeueNoWait();
+            if (md != null) {
+                try {
+                    ActiveMQMessage message = createActiveMQMessage(md);
+                    beforeMessageIsConsumed(md);
+                    listener.onMessage(message);
+                    afterMessageIsConsumed(md, false);
+                } catch (JMSException e) {
+                    session.connection.onClientInternalException(e);
+                }
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean isInUse(ActiveMQTempDestination destination) {
+        return info.getDestination().equals(destination);
+    }
+
+    public long getLastDeliveredSequenceId() {
+        return lastDeliveredSequenceId;
+    }
+
+	public IOException getFailureError() {
+		return failureError;
+	}
+
+	public void setFailureError(IOException failureError) {
+		this.failureError = failureError;
+	}
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.management.JMSProducerStatsImpl;
+import org.apache.activemq.management.StatsCapable;
+import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.util.IntrospectionSupport;
+
+/**
+ * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
+ * destination. A <CODE>MessageProducer</CODE> object is created by passing a
+ * <CODE>Destination</CODE> object to a message-producer creation method
+ * supplied by a session.
+ * <P>
+ * <CODE>MessageProducer</CODE> is the parent interface for all message
+ * producers.
+ * <P>
+ * A client also has the option of creating a message producer without supplying
+ * a destination. In this case, a destination must be provided with every send
+ * operation. A typical use for this kind of message producer is to send replies
+ * to requests using the request's <CODE>JMSReplyTo</CODE> destination.
+ * <P>
+ * A client can specify a default delivery mode, priority, and time to live for
+ * messages sent by a message producer. It can also specify the delivery mode,
+ * priority, and time to live for an individual message.
+ * <P>
+ * A client can specify a time-to-live value in milliseconds for each message it
+ * sends. This value defines a message expiration time that is the sum of the
+ * message's time-to-live and the GMT when it is sent (for transacted sends,
+ * this is the time the client sends the message, not the time the transaction
+ * is committed).
+ * <P>
+ * A JMS provider should do its best to expire messages accurately; however, the
+ * JMS API does not define the accuracy provided.
+ * 
+ * @version $Revision: 1.14 $
+ * @see javax.jms.TopicPublisher
+ * @see javax.jms.QueueSender
+ * @see javax.jms.Session#createProducer
+ */
+public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
+
+    protected ProducerInfo info;
+    protected boolean closed;
+
+    private JMSProducerStatsImpl stats;
+    private AtomicLong messageSequence;
+    private long startTime;
+    private MessageTransformer transformer;
+    private MemoryUsage producerWindow;
+
+    protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
+        super(session);
+        this.info = new ProducerInfo(producerId);
+        this.info.setWindowSize(session.connection.getProducerWindowSize());
+        if (destination != null && destination.getOptions() != null) {
+            Map<String, String> options = new HashMap<String, String>(destination.getOptions());
+            IntrospectionSupport.setProperties(this.info, options, "producer.");
+        }
+        this.info.setDestination(destination);
+
+        // Enable producer window flow control if protocol > 3 and the window
+        // size > 0
+        if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
+            producerWindow = new MemoryUsage("Producer Window: " + producerId);
+            producerWindow.setLimit(this.info.getWindowSize());
+            producerWindow.start();
+        }
+
+        this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
+        this.defaultPriority = Message.DEFAULT_PRIORITY;
+        this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
+        this.startTime = System.currentTimeMillis();
+        this.messageSequence = new AtomicLong(0);
+        this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
+        this.session.addProducer(this);
+        this.session.asyncSendPacket(info);
+        this.setSendTimeout(sendTimeout);
+        setTransformer(session.getTransformer());
+    }
+
+    public StatsImpl getStats() {
+        return stats;
+    }
+
+    public JMSProducerStatsImpl getProducerStats() {
+        return stats;
+    }
+
+    /**
+     * Gets the destination associated with this <CODE>MessageProducer</CODE>.
+     * 
+     * @return this producer's <CODE>Destination/ <CODE>
+     * @throws JMSException if the JMS provider fails to close the producer due to
+     *                      some internal error.
+     * @since 1.1
+     */
+    public Destination getDestination() throws JMSException {
+        checkClosed();
+        return this.info.getDestination();
+    }
+
+    /**
+     * Closes the message producer.
+     * <P>
+     * Since a provider may allocate some resources on behalf of a <CODE>
+     * MessageProducer</CODE>
+     * outside the Java virtual machine, clients should close them when they are
+     * not needed. Relying on garbage collection to eventually reclaim these
+     * resources may not be timely enough.
+     * 
+     * @throws JMSException if the JMS provider fails to close the producer due
+     *                 to some internal error.
+     */
+    public void close() throws JMSException {
+        if (!closed) {
+            dispose();
+            this.session.asyncSendPacket(info.createRemoveCommand());
+        }
+    }
+
+    public void dispose() {
+        if (!closed) {
+            this.session.removeProducer(this);
+            if (producerWindow != null) {
+                producerWindow.stop();
+            }
+            closed = true;
+        }
+    }
+
+    /**
+     * Check if the instance of this producer has been closed.
+     * 
+     * @throws IllegalStateException
+     */
+    protected void checkClosed() throws IllegalStateException {
+        if (closed) {
+            throw new IllegalStateException("The producer is closed");
+        }
+    }
+
+    /**
+     * Sends a message to a destination for an unidentified message producer,
+     * specifying delivery mode, priority and time to live.
+     * <P>
+     * Typically, a message producer is assigned a destination at creation time;
+     * however, the JMS API also supports unidentified message producers, which
+     * require that the destination be supplied every time a message is sent.
+     * 
+     * @param destination the destination to send this message to
+     * @param message the message to send
+     * @param deliveryMode the delivery mode to use
+     * @param priority the priority for this message
+     * @param timeToLive the message's lifetime (in milliseconds)
+     * @throws JMSException if the JMS provider fails to send the message due to
+     *                 some internal error.
+     * @throws UnsupportedOperationException if an invalid destination is
+     *                 specified.
+     * @throws InvalidDestinationException if a client uses this method with an
+     *                 invalid destination.
+     * @see javax.jms.Session#createProducer
+     * @since 1.1
+     */
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+        checkClosed();
+        if (destination == null) {
+            if (info.getDestination() == null) {
+                throw new UnsupportedOperationException("A destination must be specified.");
+            }
+            throw new InvalidDestinationException("Don't understand null destinations");
+        }
+
+        ActiveMQDestination dest;
+        if (destination == info.getDestination()) {
+            dest = (ActiveMQDestination)destination;
+        } else if (info.getDestination() == null) {
+            dest = ActiveMQDestination.transform(destination);
+        } else {
+            throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
+        }
+        if (dest == null) {
+            throw new JMSException("No destination specified");
+        }
+
+        if (transformer != null) {
+            Message transformedMessage = transformer.producerTransform(session, this, message);
+            if (transformedMessage != null) {
+                message = transformedMessage;
+            }
+        }
+
+        if (producerWindow != null) {
+            try {
+                producerWindow.waitForSpace();
+            } catch (InterruptedException e) {
+                throw new JMSException("Send aborted due to thread interrupt.");
+            }
+        }
+
+        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout);
+
+        stats.onMessage();
+    }
+
+    public MessageTransformer getTransformer() {
+        return transformer;
+    }
+
+    /**
+     * Sets the transformer used to transform messages before they are sent on
+     * to the JMS bus
+     */
+    public void setTransformer(MessageTransformer transformer) {
+        this.transformer = transformer;
+    }
+
+    /**
+     * @return the time in milli second when this object was created.
+     */
+    protected long getStartTime() {
+        return this.startTime;
+    }
+
+    /**
+     * @return Returns the messageSequence.
+     */
+    protected long getMessageSequence() {
+        return messageSequence.incrementAndGet();
+    }
+
+    /**
+     * @param messageSequence The messageSequence to set.
+     */
+    protected void setMessageSequence(AtomicLong messageSequence) {
+        this.messageSequence = messageSequence;
+    }
+
+    /**
+     * @return Returns the info.
+     */
+    protected ProducerInfo getProducerInfo() {
+        return this.info != null ? this.info : null;
+    }
+
+    /**
+     * @param info The info to set
+     */
+    protected void setProducerInfo(ProducerInfo info) {
+        this.info = info;
+    }
+
+    public String toString() {
+        return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
+    }
+
+    public void onProducerAck(ProducerAck pa) {
+        if (this.producerWindow != null) {
+            this.producerWindow.decreaseUsage(pa.getSize());
+        }
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+/**
+ * A useful base class for implementing a {@link MessageProducer}
+ *
+ * @version $Revision: $
+ */
+public abstract class ActiveMQMessageProducerSupport implements MessageProducer, Closeable {
+    protected ActiveMQSession session;
+    protected boolean disableMessageID;
+    protected boolean disableMessageTimestamp;
+    protected int defaultDeliveryMode;
+    protected int defaultPriority;
+    protected long defaultTimeToLive;
+    protected int sendTimeout=0;
+
+    public ActiveMQMessageProducerSupport(ActiveMQSession session) {
+        this.session = session;
+        disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault();
+    }
+
+    /**
+     * Sets whether message IDs are disabled.
+     * <P>
+     * Since message IDs take some effort to create and increase a message's
+     * size, some JMS providers may be able to optimize message overhead if
+     * they are given a hint that the message ID is not used by an application.
+     * By calling the <CODE>setDisableMessageID</CODE> method on this message
+     * producer, a JMS client enables this potential optimization for all
+     * messages sent by this message producer. If the JMS provider accepts this
+     * hint, these messages must have the message ID set to null; if the
+     * provider ignores the hint, the message ID must be set to its normal
+     * unique value.
+     * <P>
+     * Message IDs are enabled by default.
+     *
+     * @param value indicates if message IDs are disabled
+     * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to
+     *                      some internal error.
+     */
+    public void setDisableMessageID(boolean value) throws JMSException {
+        checkClosed();
+        this.disableMessageID = value;
+    }
+
+    /**
+     * Gets an indication of whether message IDs are disabled.
+     *
+     * @return an indication of whether message IDs are disabled
+     * @throws javax.jms.JMSException if the JMS provider fails to determine if message IDs are
+     *                      disabled due to some internal error.
+     */
+    public boolean getDisableMessageID() throws JMSException {
+        checkClosed();
+        return this.disableMessageID;
+    }
+
+    /**
+     * Sets whether message timestamps are disabled.
+     * <P>
+     * Since timestamps take some effort to create and increase a message's
+     * size, some JMS providers may be able to optimize message overhead if
+     * they are given a hint that the timestamp is not used by an application.
+     * By calling the <CODE>setDisableMessageTimestamp</CODE> method on this
+     * message producer, a JMS client enables this potential optimization for
+     * all messages sent by this message producer. If the JMS provider accepts
+     * this hint, these messages must have the timestamp set to zero; if the
+     * provider ignores the hint, the timestamp must be set to its normal
+     * value.
+     * <P>
+     * Message timestamps are enabled by default.
+     *
+     * @param value indicates if message timestamps are disabled
+     * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to
+     *                      some internal error.
+     */
+    public void setDisableMessageTimestamp(boolean value) throws JMSException {
+        checkClosed();
+        this.disableMessageTimestamp = value;
+    }
+
+    /**
+     * Gets an indication of whether message timestamps are disabled.
+     *
+     * @return an indication of whether message timestamps are disabled
+     * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to
+     *                      some internal error.
+     */
+    public boolean getDisableMessageTimestamp() throws JMSException {
+        checkClosed();
+        return this.disableMessageTimestamp;
+    }
+
+    /**
+     * Sets the producer's default delivery mode.
+     * <P>
+     * Delivery mode is set to <CODE>PERSISTENT</CODE> by default.
+     *
+     * @param newDeliveryMode the message delivery mode for this message producer; legal
+     *                        values are <code>DeliveryMode.NON_PERSISTENT</code> and
+     *                        <code>DeliveryMode.PERSISTENT</code>
+     * @throws javax.jms.JMSException if the JMS provider fails to set the delivery mode due to
+     *                      some internal error.
+     * @see javax.jms.MessageProducer#getDeliveryMode
+     * @see javax.jms.DeliveryMode#NON_PERSISTENT
+     * @see javax.jms.DeliveryMode#PERSISTENT
+     * @see javax.jms.Message#DEFAULT_DELIVERY_MODE
+     */
+    public void setDeliveryMode(int newDeliveryMode) throws JMSException {
+        if (newDeliveryMode != DeliveryMode.PERSISTENT && newDeliveryMode != DeliveryMode.NON_PERSISTENT) {
+            throw new javax.jms.IllegalStateException("unkown delivery mode: " + newDeliveryMode);
+        }
+        checkClosed();
+        this.defaultDeliveryMode = newDeliveryMode;
+    }
+
+    /**
+     * Gets the producer's default delivery mode.
+     *
+     * @return the message delivery mode for this message producer
+     * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to
+     *                      some internal error.
+     */
+    public int getDeliveryMode() throws JMSException {
+        checkClosed();
+        return this.defaultDeliveryMode;
+    }
+
+    /**
+     * Sets the producer's default priority.
+     * <P>
+     * The JMS API defines ten levels of priority value, with 0 as the lowest
+     * priority and 9 as the highest. Clients should consider priorities 0-4 as
+     * gradations of normal priority and priorities 5-9 as gradations of
+     * expedited priority. Priority is set to 4 by default.
+     *
+     * @param newDefaultPriority the message priority for this message producer; must be a
+     *                           value between 0 and 9
+     * @throws javax.jms.JMSException if the JMS provider fails to set the delivery mode due to
+     *                      some internal error.
+     * @see javax.jms.MessageProducer#getPriority
+     * @see javax.jms.Message#DEFAULT_PRIORITY
+     */
+    public void setPriority(int newDefaultPriority) throws JMSException {
+        if (newDefaultPriority < 0 || newDefaultPriority > 9) {
+            throw new IllegalStateException("default priority must be a value between 0 and 9");
+        }
+        checkClosed();
+        this.defaultPriority = newDefaultPriority;
+    }
+
+    /**
+     * Gets the producer's default priority.
+     *
+     * @return the message priority for this message producer
+     * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to
+     *                      some internal error.
+     * @see javax.jms.MessageProducer#setPriority
+     */
+    public int getPriority() throws JMSException {
+        checkClosed();
+        return this.defaultPriority;
+    }
+
+    /**
+     * Sets the default length of time in milliseconds from its dispatch time
+     * that a produced message should be retained by the message system.
+     * <P>
+     * Time to live is set to zero by default.
+     *
+     * @param timeToLive the message time to live in milliseconds; zero is unlimited
+     * @throws javax.jms.JMSException if the JMS provider fails to set the time to live due to
+     *                      some internal error.
+     * @see javax.jms.MessageProducer#getTimeToLive
+     * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE
+     */
+    public void setTimeToLive(long timeToLive) throws JMSException {
+        if (timeToLive < 0L) {
+            throw new IllegalStateException("cannot set a negative timeToLive");
+        }
+        checkClosed();
+        this.defaultTimeToLive = timeToLive;
+    }
+
+    /**
+     * Gets the default length of time in milliseconds from its dispatch time
+     * that a produced message should be retained by the message system.
+     *
+     * @return the message time to live in milliseconds; zero is unlimited
+     * @throws javax.jms.JMSException if the JMS provider fails to get the time to live due to
+     *                      some internal error.
+     * @see javax.jms.MessageProducer#setTimeToLive
+     */
+    public long getTimeToLive() throws JMSException {
+        checkClosed();
+        return this.defaultTimeToLive;
+    }
+
+    /**
+     * Sends a message using the <CODE>MessageProducer</CODE>'s default
+     * delivery mode, priority, and time to live.
+     *
+     * @param message the message to send
+     * @throws javax.jms.JMSException                if the JMS provider fails to send the message due to some
+     *                                     internal error.
+     * @throws javax.jms.MessageFormatException      if an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException if a client uses this method with a <CODE>
+     *                                     MessageProducer</CODE> with an invalid destination.
+     * @throws UnsupportedOperationException
+     *                                     if a client uses this method with a <CODE>
+     *                                     MessageProducer</CODE> that did not specify a
+     *                                     destination at creation time.
+     * @see javax.jms.Session#createProducer
+     * @see javax.jms.MessageProducer
+     * @since 1.1
+     */
+    public void send(Message message) throws JMSException {
+        this.send(this.getDestination(),
+                  message,
+                  this.defaultDeliveryMode,
+                  this.defaultPriority,
+                  this.defaultTimeToLive);
+    }
+
+    /**
+     * Sends a message to the destination, specifying delivery mode, priority,
+     * and time to live.
+     *
+     * @param message      the message to send
+     * @param deliveryMode the delivery mode to use
+     * @param priority     the priority for this message
+     * @param timeToLive   the message's lifetime (in milliseconds)
+     * @throws javax.jms.JMSException                if the JMS provider fails to send the message due to some
+     *                                     internal error.
+     * @throws javax.jms.MessageFormatException      if an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException if a client uses this method with a <CODE>
+     *                                     MessageProducer</CODE> with an invalid destination.
+     * @throws UnsupportedOperationException
+     *                                     if a client uses this method with a <CODE>
+     *                                     MessageProducer</CODE> that did not specify a
+     *                                     destination at creation time.
+     * @see javax.jms.Session#createProducer
+     * @since 1.1
+     */
+    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+        this.send(this.getDestination(),
+                  message,
+                  deliveryMode,
+                  priority,
+                  timeToLive);
+    }
+
+    /**
+     * Sends a message to a destination for an unidentified message producer.
+     * Uses the <CODE>MessageProducer</CODE>'s default delivery mode,
+     * priority, and time to live.
+     * <P>
+     * Typically, a message producer is assigned a destination at creation
+     * time; however, the JMS API also supports unidentified message producers,
+     * which require that the destination be supplied every time a message is
+     * sent.
+     *
+     * @param destination the destination to send this message to
+     * @param message     the message to send
+     * @throws javax.jms.JMSException                if the JMS provider fails to send the message due to some
+     *                                     internal error.
+     * @throws javax.jms.MessageFormatException      if an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException if a client uses this method with an invalid destination.
+     * @throws UnsupportedOperationException
+     *                                     if a client uses this method with a <CODE>
+     *                                     MessageProducer</CODE> that specified a destination at
+     *                                     creation time.
+     * @see javax.jms.Session#createProducer
+     * @see javax.jms.MessageProducer
+     */
+    public void send(Destination destination, Message message) throws JMSException {
+        this.send(destination,
+                  message,
+                  this.defaultDeliveryMode,
+                  this.defaultPriority,
+                  this.defaultTimeToLive);
+    }
+
+
+    protected abstract void checkClosed() throws IllegalStateException;
+
+    /**
+     * @return the sendTimeout
+     */
+    public int getSendTimeout() {
+        return sendTimeout;
+    }
+
+    /**
+     * @param sendTimeout the sendTimeout to set
+     */
+    public void setSendTimeout(int sendTimeout) {
+        this.sendTimeout = sendTimeout;
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.Enumeration;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageEOFException;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+
+/**
+ * A helper class for converting normal JMS interfaces into ActiveMQ specific
+ * ones.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public final class ActiveMQMessageTransformation {
+
+    private ActiveMQMessageTransformation() {    
+    }
+    
+    /**
+     * Creates a an available JMS message from another provider.
+     * 
+     * @param destination - Destination to be converted into ActiveMQ's
+     *                implementation.
+     * @return ActiveMQDestination - ActiveMQ's implementation of the
+     *         destination.
+     * @throws JMSException if an error occurs
+     */
+    public static ActiveMQDestination transformDestination(Destination destination) throws JMSException {
+        ActiveMQDestination activeMQDestination = null;
+
+        if (destination != null) {
+            if (destination instanceof ActiveMQDestination) {
+                return (ActiveMQDestination)destination;
+
+            } else {
+                if (destination instanceof TemporaryQueue) {
+                    activeMQDestination = new ActiveMQTempQueue(((Queue)destination).getQueueName());
+                } else if (destination instanceof TemporaryTopic) {
+                    activeMQDestination = new ActiveMQTempTopic(((Topic)destination).getTopicName());
+                } else if (destination instanceof Queue) {
+                    activeMQDestination = new ActiveMQQueue(((Queue)destination).getQueueName());
+                } else if (destination instanceof Topic) {
+                    activeMQDestination = new ActiveMQTopic(((Topic)destination).getTopicName());
+                }
+            }
+        }
+
+        return activeMQDestination;
+    }
+
+    /**
+     * Creates a fast shallow copy of the current ActiveMQMessage or creates a
+     * whole new message instance from an available JMS message from another
+     * provider.
+     * 
+     * @param message - Message to be converted into ActiveMQ's implementation.
+     * @param connection
+     * @return ActiveMQMessage - ActiveMQ's implementation object of the
+     *         message.
+     * @throws JMSException if an error occurs
+     */
+    public static ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection)
+        throws JMSException {
+        if (message instanceof ActiveMQMessage) {
+            return (ActiveMQMessage)message;
+
+        } else {
+            ActiveMQMessage activeMessage = null;
+
+            if (message instanceof BytesMessage) {
+                BytesMessage bytesMsg = (BytesMessage)message;
+                bytesMsg.reset();
+                ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
+                msg.setConnection(connection);
+                try {
+                    for (;;) {
+                        // Reads a byte from the message stream until the stream
+                        // is empty
+                        msg.writeByte(bytesMsg.readByte());
+                    }
+                } catch (MessageEOFException e) {
+                    // if an end of message stream as expected
+                } catch (JMSException e) {
+                }
+
+                activeMessage = msg;
+            } else if (message instanceof MapMessage) {
+                MapMessage mapMsg = (MapMessage)message;
+                ActiveMQMapMessage msg = new ActiveMQMapMessage();
+                msg.setConnection(connection);
+                Enumeration iter = mapMsg.getMapNames();
+
+                while (iter.hasMoreElements()) {
+                    String name = iter.nextElement().toString();
+                    msg.setObject(name, mapMsg.getObject(name));
+                }
+
+                activeMessage = msg;
+            } else if (message instanceof ObjectMessage) {
+                ObjectMessage objMsg = (ObjectMessage)message;
+                ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
+                msg.setConnection(connection);
+                msg.setObject(objMsg.getObject());
+                msg.storeContent();
+                activeMessage = msg;
+            } else if (message instanceof StreamMessage) {
+                StreamMessage streamMessage = (StreamMessage)message;
+                streamMessage.reset();
+                ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
+                msg.setConnection(connection);
+                Object obj = null;
+
+                try {
+                    while ((obj = streamMessage.readObject()) != null) {
+                        msg.writeObject(obj);
+                    }
+                } catch (MessageEOFException e) {
+                    // if an end of message stream as expected
+                } catch (JMSException e) {
+                }
+
+                activeMessage = msg;
+            } else if (message instanceof TextMessage) {
+                TextMessage textMsg = (TextMessage)message;
+                ActiveMQTextMessage msg = new ActiveMQTextMessage();
+                msg.setConnection(connection);
+                msg.setText(textMsg.getText());
+                activeMessage = msg;
+            } else {
+                activeMessage = new ActiveMQMessage();
+                activeMessage.setConnection(connection);
+            }
+
+            copyProperties(message, activeMessage);
+
+            return activeMessage;
+        }
+    }
+
+    /**
+     * Copies the standard JMS and user defined properties from the givem
+     * message to the specified message
+     * 
+     * @param fromMessage the message to take the properties from
+     * @param toMessage the message to add the properties to
+     * @throws JMSException
+     */
+    public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException {
+        toMessage.setJMSMessageID(fromMessage.getJMSMessageID());
+        toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID());
+        toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
+        toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination()));
+        toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode());
+        toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered());
+        toMessage.setJMSType(fromMessage.getJMSType());
+        toMessage.setJMSExpiration(fromMessage.getJMSExpiration());
+        toMessage.setJMSPriority(fromMessage.getJMSPriority());
+        toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp());
+
+        Enumeration propertyNames = fromMessage.getPropertyNames();
+
+        while (propertyNames.hasMoreElements()) {
+            String name = propertyNames.nextElement().toString();
+            Object obj = fromMessage.getObjectProperty(name);
+            toMessage.setObjectProperty(name, obj);
+        }
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
------------------------------------------------------------------------------
    svn:executable = *