You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/02 23:29:35 UTC

svn commit: r781177 [9/11] - in /activemq/sandbox/activemq-flow: activemq-bio/ activemq-bio/src/main/java/org/ activemq-bio/src/main/java/org/apache/ activemq-bio/src/main/java/org/apache/activemq/ activemq-bio/src/main/java/org/apache/activemq/transpo...

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableListener.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableListener.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.MessageConsumer;
+
+/**
+ * A listener which is notified if a message is available for processing via the
+ * receive methods. Typically on receiving this notification you can call 
+ * {@link MessageConsumer#receiveNoWait()} to get the new message immediately.
+ * 
+ * Note that this notification just indicates a message is available for synchronous consumption,
+ * it does not actually consume the message.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public interface MessageAvailableListener {
+
+    void onMessageAvailable(MessageConsumer consumer);
+
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.MessageDispatch;
+
+public class MessageDispatchChannel {
+
+    private final Object mutex = new Object();
+    private final LinkedList<MessageDispatch> list;
+    private boolean closed;
+    private boolean running;
+
+    public MessageDispatchChannel() {
+        this.list = new LinkedList<MessageDispatch>();
+    }
+
+    public void enqueue(MessageDispatch message) {
+        synchronized (mutex) {
+            list.addLast(message);
+            mutex.notify();
+        }
+    }
+
+    public void enqueueFirst(MessageDispatch message) {
+        synchronized (mutex) {
+            list.addFirst(message);
+            mutex.notify();
+        }
+    }
+
+    public boolean isEmpty() {
+        synchronized (mutex) {
+            return list.isEmpty();
+        }
+    }
+
+    /**
+     * Used to get an enqueued message. The amount of time this method blocks is
+     * based on the timeout value. - if timeout==-1 then it blocks until a
+     * message is received. - if timeout==0 then it it tries to not block at
+     * all, it returns a message if it is available - if timeout>0 then it
+     * blocks up to timeout amount of time. Expired messages will consumed by
+     * this method.
+     * 
+     * @throws JMSException
+     * @return null if we timeout or if the consumer is closed.
+     * @throws InterruptedException
+     */
+    public MessageDispatch dequeue(long timeout) throws InterruptedException {
+        synchronized (mutex) {
+            // Wait until the consumer is ready to deliver messages.
+            while (timeout != 0 && !closed && (list.isEmpty() || !running)) {
+                if (timeout == -1) {
+                    mutex.wait();
+                } else {
+                    mutex.wait(timeout);
+                    break;
+                }
+            }
+            if (closed || !running || list.isEmpty()) {
+                return null;
+            }
+            return list.removeFirst();
+        }
+    }
+
+    public MessageDispatch dequeueNoWait() {
+        synchronized (mutex) {
+            if (closed || !running || list.isEmpty()) {
+                return null;
+            }
+            return list.removeFirst();
+        }
+    }
+
+    public MessageDispatch peek() {
+        synchronized (mutex) {
+            if (closed || !running || list.isEmpty()) {
+                return null;
+            }
+            return list.getFirst();
+        }
+    }
+
+    public void start() {
+        synchronized (mutex) {
+            running = true;
+            mutex.notifyAll();
+        }
+    }
+
+    public void stop() {
+        synchronized (mutex) {
+            running = false;
+            mutex.notifyAll();
+        }
+    }
+
+    public void close() {
+        synchronized (mutex) {
+            if (!closed) {
+                running = false;
+                closed = true;
+            }
+            mutex.notifyAll();
+        }
+    }
+
+    public void clear() {
+        synchronized (mutex) {
+            list.clear();
+        }
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public int size() {
+        synchronized (mutex) {
+            return list.size();
+        }
+    }
+
+    public Object getMutex() {
+        return mutex;
+    }
+
+    public boolean isRunning() {
+        return running;
+    }
+
+    public List<MessageDispatch> removeAll() {
+        synchronized (mutex) {
+            ArrayList<MessageDispatch> rc = new ArrayList<MessageDispatch>(list);
+            list.clear();
+            return rc;
+        }
+    }
+
+    public String toString() {
+        synchronized (mutex) {
+            return list.toString();
+        }
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformer.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformer.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformer.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * A plugin strategy for transforming a message before it is sent by the JMS client or before it is
+ * dispatched to the JMS consumer
+ *
+ * @version $Revision: 564271 $
+ */
+public interface MessageTransformer {
+
+    /**
+     * Transforms the given message inside the producer before it is sent to the JMS bus.
+     */
+    Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException;
+
+    /**
+     * Transforms the given message inside the consumer before being dispatched to the client code
+     */
+    Message consumerTransform(Session session, MessageConsumer consumer, Message message)throws JMSException;
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformerSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformerSupport.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformerSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformerSupport.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+/**
+ * A useful base class for message transformers.
+ *
+ * @version $Revision: 563921 $
+ */
+public abstract class MessageTransformerSupport implements MessageTransformer {
+
+    /**
+     * Copies the standard JMS and user defined properties from the givem message to the specified message
+     *
+     * @param fromMessage the message to take the properties from
+     * @param toMesage the message to add the properties to
+     * @throws JMSException
+     */
+    protected void copyProperties(Message fromMessage, Message toMesage) throws JMSException {
+        ActiveMQMessageTransformation.copyProperties(fromMessage, toMesage);
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.IllegalStateException;
+
+/**
+ * An exception thrown when an operation is invoked on a service
+ * which has not yet been started.
+ *
+ * @version $Revision: 1.2 $
+ */
+public class NotStartedException extends IllegalStateException {
+
+    private static final long serialVersionUID = -4907909323529887659L;
+
+    public NotStartedException() {
+        super("IllegalState: This service has not yet been started", "AMQ-1003");
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.Serializable;
+import java.util.Random;
+
+/**
+ * Configuration options used to control how messages are re-delivered when they
+ * are rolled back.
+ * 
+ * @org.apache.xbean.XBean element="redeliveryPolicy"
+ * @version $Revision: 1.11 $
+ */
+public class RedeliveryPolicy implements Cloneable, Serializable {
+
+    public static final int NO_MAXIMUM_REDELIVERIES = -1;
+    private static Random randomNumberGenerator;
+
+    // +/-15% for a 30% spread -cgs
+    private double collisionAvoidanceFactor = 0.15d;
+    private int maximumRedeliveries = 6;
+    private long initialRedeliveryDelay = 1000L;
+    private boolean useCollisionAvoidance;
+    private boolean useExponentialBackOff;
+    private short backOffMultiplier = 5;
+
+    public RedeliveryPolicy() {
+    }
+
+    public RedeliveryPolicy copy() {
+        try {
+            return (RedeliveryPolicy)clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Could not clone: " + e, e);
+        }
+    }
+
+    public short getBackOffMultiplier() {
+        return backOffMultiplier;
+    }
+
+    public void setBackOffMultiplier(short backOffMultiplier) {
+        this.backOffMultiplier = backOffMultiplier;
+    }
+
+    public short getCollisionAvoidancePercent() {
+        return (short)Math.round(collisionAvoidanceFactor * 100);
+    }
+
+    public void setCollisionAvoidancePercent(short collisionAvoidancePercent) {
+        this.collisionAvoidanceFactor = collisionAvoidancePercent * 0.01d;
+    }
+
+    public long getInitialRedeliveryDelay() {
+        return initialRedeliveryDelay;
+    }
+
+    public void setInitialRedeliveryDelay(long initialRedeliveryDelay) {
+        this.initialRedeliveryDelay = initialRedeliveryDelay;
+    }
+
+    public int getMaximumRedeliveries() {
+        return maximumRedeliveries;
+    }
+
+    public void setMaximumRedeliveries(int maximumRedeliveries) {
+        this.maximumRedeliveries = maximumRedeliveries;
+    }
+
+    public long getRedeliveryDelay(long previousDelay) {
+        long redeliveryDelay;
+
+        if (previousDelay == 0) {
+            redeliveryDelay = initialRedeliveryDelay;
+        } else if (useExponentialBackOff && backOffMultiplier > 1) {
+            redeliveryDelay = previousDelay * backOffMultiplier;
+        } else {
+            redeliveryDelay = previousDelay;
+        }
+
+        if (useCollisionAvoidance) {
+            /*
+             * First random determines +/-, second random determines how far to
+             * go in that direction. -cgs
+             */
+            Random random = getRandomNumberGenerator();
+            double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble();
+            redeliveryDelay += redeliveryDelay * variance;
+        }
+
+        return redeliveryDelay;
+    }
+
+    public boolean isUseCollisionAvoidance() {
+        return useCollisionAvoidance;
+    }
+
+    public void setUseCollisionAvoidance(boolean useCollisionAvoidance) {
+        this.useCollisionAvoidance = useCollisionAvoidance;
+    }
+
+    public boolean isUseExponentialBackOff() {
+        return useExponentialBackOff;
+    }
+
+    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
+        this.useExponentialBackOff = useExponentialBackOff;
+    }
+
+    protected static synchronized Random getRandomNumberGenerator() {
+        if (randomNumberGenerator == null) {
+            randomNumberGenerator = new Random();
+        }
+        return randomNumberGenerator;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Service.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Service.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Service.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Service.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+
+/**
+ * The core lifecyle interface for ActiveMQ components.
+ *  
+ * If there was a standard way to do so, it'd be good to register this 
+ * interface with Spring so it treats the start/stop methods as those of
+ * {@link org.springframework.beans.factory.InitializingBean} 
+ * and {@link org.springframework.beans.factory.DisposableBean}
+ * 
+ * @version $Revision: 1.1 $
+ */
+public interface Service {
+
+    void start() throws Exception;
+    
+    void stop() throws Exception;
+    
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Topic;
+
+/**
+ * The StreamConnection interface allows you to send and receive data from a
+ * Destination in using standard java InputStream and OutputStream objects. It's
+ * best use case is to send and receive large amounts of data that would be to
+ * large to hold in a single JMS message.
+ * 
+ * @version $Revision$
+ */
+public interface StreamConnection extends Connection {
+
+    InputStream createInputStream(Destination dest) throws JMSException;
+
+    InputStream createInputStream(Destination dest, String messageSelector) throws JMSException;
+
+    InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException;
+
+    InputStream createDurableInputStream(Topic dest, String name) throws JMSException;
+
+    InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException;
+
+    InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException;
+
+    OutputStream createOutputStream(Destination dest) throws JMSException;
+
+    OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException;
+
+    /**
+     * Unsubscribes a durable subscription that has been created by a client.
+     * <P>
+     * This method deletes the state being maintained on behalf of the
+     * subscriber by its provider.
+     * <P>
+     * It is erroneous for a client to delete a durable subscription while there
+     * is an active <CODE>MessageConsumer </CODE> or
+     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
+     * message is part of a pending transaction or has not been acknowledged in
+     * the session.
+     * 
+     * @param name the name used to identify this subscription
+     * @throws JMSException if the session fails to unsubscribe to the durable
+     *                 subscription due to some internal error.
+     * @throws InvalidDestinationException if an invalid subscription name is
+     *                 specified.
+     * @since 1.1
+     */
+    void unsubscribe(String name) throws JMSException;
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+
+/**
+* A holder for different thread priorites used in ActiveMQ
+* 
+* @version $Revision: 1.9 $
+*/
+
+public interface ThreadPriorities {
+    int INBOUND_BROKER_CONNECTION = 6;
+    int OUT_BOUND_BROKER_DISPATCH = 6;
+    int INBOUND_CLIENT_CONNECTION = 7;
+    int INBOUND_CLIENT_SESSION = 7;
+    int BROKER_MANAGEMENT = 9;
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,659 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.JMSException;
+import javax.jms.TransactionInProgressException;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.DataArrayResponse;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.IntegerResponse;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A TransactionContext provides the means to control a JMS transaction. It
+ * provides a local transaction interface and also an XAResource interface. <p/>
+ * An application server controls the transactional assignment of an XASession
+ * by obtaining its XAResource. It uses the XAResource to assign the session to
+ * a transaction, prepare and commit work on the transaction, and so on. <p/> An
+ * XAResource provides some fairly sophisticated facilities for interleaving
+ * work on multiple transactions, recovering a list of transactions in progress,
+ * and so on. A JTA aware JMS provider must fully implement this functionality.
+ * This could be done by using the services of a database that supports XA, or a
+ * JMS provider may choose to implement this functionality from scratch. <p/>
+ * 
+ * @version $Revision: 1.10 $
+ * @see javax.jms.Session
+ * @see javax.jms.QueueSession
+ * @see javax.jms.TopicSession
+ * @see javax.jms.XASession
+ */
+public class TransactionContext implements XAResource {
+
+    private static final Log LOG = LogFactory.getLog(TransactionContext.class);
+
+    // XATransactionId -> ArrayList of TransactionContext objects
+    private final ConcurrentHashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = new ConcurrentHashMap<TransactionId, List<TransactionContext>>();
+
+    private final ActiveMQConnection connection;
+    private final LongSequenceGenerator localTransactionIdGenerator;
+    private final ConnectionId connectionId;
+    private List<Synchronization> synchronizations;
+
+    // To track XA transactions.
+    private Xid associatedXid;
+    private TransactionId transactionId;
+    private LocalTransactionEventListener localTransactionEventListener;
+
+    public TransactionContext(ActiveMQConnection connection) {
+        this.connection = connection;
+        this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
+        this.connectionId = connection.getConnectionInfo().getConnectionId();
+    }
+
+    public boolean isInXATransaction() {
+        return transactionId != null && transactionId.isXATransaction();
+    }
+
+    public boolean isInLocalTransaction() {
+        return transactionId != null && transactionId.isLocalTransaction();
+    }
+
+    public boolean isInTransaction() {
+        return transactionId != null;
+    }
+    
+    /**
+     * @return Returns the localTransactionEventListener.
+     */
+    public LocalTransactionEventListener getLocalTransactionEventListener() {
+        return localTransactionEventListener;
+    }
+
+    /**
+     * Used by the resource adapter to listen to transaction events.
+     * 
+     * @param localTransactionEventListener The localTransactionEventListener to
+     *                set.
+     */
+    public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
+        this.localTransactionEventListener = localTransactionEventListener;
+    }
+
+    // ///////////////////////////////////////////////////////////
+    //
+    // Methods that work with the Synchronization objects registered with
+    // the transaction.
+    //
+    // ///////////////////////////////////////////////////////////
+
+    public void addSynchronization(Synchronization s) {
+        if (synchronizations == null) {
+            synchronizations = new ArrayList<Synchronization>(10);
+        }
+        synchronizations.add(s);
+    }
+
+    private void afterRollback() throws JMSException {
+        if (synchronizations == null) {
+            return;
+        }
+
+        int size = synchronizations.size();
+        try {
+            for (int i = 0; i < size; i++) {
+                synchronizations.get(i).afterRollback();
+            }
+        } catch (JMSException e) {
+            throw e;
+        } catch (Throwable e) {
+            throw JMSExceptionSupport.create(e);
+        } finally {
+            synchronizations = null;
+        }
+    }
+
+    private void afterCommit() throws JMSException {
+        if (synchronizations == null) {
+            return;
+        }
+
+        int size = synchronizations.size();
+        try {
+            for (int i = 0; i < size; i++) {
+                synchronizations.get(i).afterCommit();
+            }
+        } catch (JMSException e) {
+            throw e;
+        } catch (Throwable e) {
+            throw JMSExceptionSupport.create(e);
+        } finally {
+        	synchronizations = null;
+        }
+    }
+
+    private void beforeEnd() throws JMSException {
+        if (synchronizations == null) {
+            return;
+        }
+
+        int size = synchronizations.size();
+        try {
+            for (int i = 0; i < size; i++) {
+                synchronizations.get(i).beforeEnd();
+            }
+        } catch (JMSException e) {
+            throw e;
+        } catch (Throwable e) {
+            throw JMSExceptionSupport.create(e);
+        }
+    }
+
+    public TransactionId getTransactionId() {
+        return transactionId;
+    }
+
+    // ///////////////////////////////////////////////////////////
+    //
+    // Local transaction interface.
+    //
+    // ///////////////////////////////////////////////////////////
+
+    /**
+     * Start a local transaction.
+     * @throws javax.jms.JMSException on internal error
+     */
+    public void begin() throws JMSException {
+
+        if (isInXATransaction()) {
+            throw new TransactionInProgressException("Cannot start local transaction.  XA transaction is already in progress.");
+        }
+        
+        if (transactionId == null) {
+            synchronizations = null;
+            this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId());
+            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
+            this.connection.ensureConnectionInfoSent();
+            this.connection.asyncSendPacket(info);
+
+            // Notify the listener that the tx was started.
+            if (localTransactionEventListener != null) {
+                localTransactionEventListener.beginEvent();
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Begin:" + transactionId);
+            }
+        }
+        
+    }
+
+    /**
+     * Rolls back any work done in this transaction and releases any locks
+     * currently held.
+     * 
+     * @throws JMSException if the JMS provider fails to roll back the
+     *                 transaction due to some internal error.
+     * @throws javax.jms.IllegalStateException if the method is not called by a
+     *                 transacted session.
+     */
+    public void rollback() throws JMSException {
+        if (isInXATransaction()) {
+            throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
+        }
+        
+        beforeEnd();
+        if (transactionId != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Rollback: "  + transactionId
+                + " syncCount: " 
+                + (synchronizations != null ? synchronizations.size() : 0));
+            }
+
+            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
+            this.transactionId = null;
+            this.connection.asyncSendPacket(info);
+            // Notify the listener that the tx was rolled back
+            if (localTransactionEventListener != null) {
+                localTransactionEventListener.rollbackEvent();
+            }
+        }
+
+        afterRollback();
+    }
+
+    /**
+     * Commits all work done in this transaction and releases any locks
+     * currently held.
+     * 
+     * @throws JMSException if the JMS provider fails to commit the transaction
+     *                 due to some internal error.
+     * @throws javax.jms.IllegalStateException if the method is not called by a
+     *                 transacted session.
+     */
+    public void commit() throws JMSException {
+        if (isInXATransaction()) {
+            throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
+        }
+        
+        beforeEnd();
+
+        // Only send commit if the transaction was started.
+        if (transactionId != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Commit: "  + transactionId
+                        + " syncCount: " 
+                        + (synchronizations != null ? synchronizations.size() : 0));
+            }
+
+            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
+            this.transactionId = null;
+            // Notify the listener that the tx was committed back
+            this.connection.syncSendPacket(info);
+            if (localTransactionEventListener != null) {
+                localTransactionEventListener.commitEvent();
+            }
+            afterCommit();
+        }
+    }
+
+    // ///////////////////////////////////////////////////////////
+    //
+    // XAResource Implementation
+    //
+    // ///////////////////////////////////////////////////////////
+    /**
+     * Associates a transaction with the resource.
+     */
+    public void start(Xid xid, int flags) throws XAException {
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Start: " + xid);
+        }
+        if (isInLocalTransaction()) {
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        // Are we already associated?
+        if (associatedXid != null) {
+            throw new XAException(XAException.XAER_PROTO);
+        }
+
+        // if ((flags & TMJOIN) == TMJOIN) {
+        // // TODO: verify that the server has seen the xid
+        // }
+        // if ((flags & TMJOIN) == TMRESUME) {
+        // // TODO: verify that the xid was suspended.
+        // }
+
+        // associate
+        synchronizations = null;
+        setXid(xid);
+    }
+
+    /**
+     * @return connectionId for connection
+     */
+    private ConnectionId getConnectionId() {
+        return connection.getConnectionInfo().getConnectionId();
+    }
+
+    public void end(Xid xid, int flags) throws XAException {
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("End: " + xid);
+        }
+        
+        if (isInLocalTransaction()) {
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        
+        if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
+            // You can only suspend the associated xid.
+            if (!equals(associatedXid, xid)) {
+                throw new XAException(XAException.XAER_PROTO);
+            }
+
+            // TODO: we may want to put the xid in a suspended list.
+            try {
+                beforeEnd();
+            } catch (JMSException e) {
+                throw toXAException(e);
+            }
+            setXid(null);
+        } else if ((flags & TMSUCCESS) == TMSUCCESS) {
+            // set to null if this is the current xid.
+            // otherwise this could be an asynchronous success call
+            if (equals(associatedXid, xid)) {
+                try {
+                    beforeEnd();
+                } catch (JMSException e) {
+                    throw toXAException(e);
+                }
+                setXid(null);
+            }
+        } else {
+            throw new XAException(XAException.XAER_INVAL);
+        }
+    }
+
+    private boolean equals(Xid xid1, Xid xid2) {
+        if (xid1 == xid2) {
+            return true;
+        }
+        if (xid1 == null ^ xid2 == null) {
+            return false;
+        }
+        return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())
+               && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
+    }
+
+    public int prepare(Xid xid) throws XAException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Prepare: " + xid);
+        }
+        
+        // We allow interleaving multiple transactions, so
+        // we don't limit prepare to the associated xid.
+        XATransactionId x;
+        // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
+        // called first
+        if (xid == null || (equals(associatedXid, xid))) {
+            throw new XAException(XAException.XAER_PROTO);
+        } else {
+            // TODO: cache the known xids so we don't keep recreating this one??
+            x = new XATransactionId(xid);
+        }
+
+        try {
+            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
+
+            // Find out if the server wants to commit or rollback.
+            IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info);
+            return response.getResult();
+
+        } catch (JMSException e) {
+            throw toXAException(e);
+        }
+    }
+
+    public void rollback(Xid xid) throws XAException {
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Rollback: " + xid);
+        }
+        
+        // We allow interleaving multiple transactions, so
+        // we don't limit rollback to the associated xid.
+        XATransactionId x;
+        if (xid == null) {
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        if (equals(associatedXid, xid)) {
+            // I think this can happen even without an end(xid) call. Need to
+            // check spec.
+            x = (XATransactionId)transactionId;
+        } else {
+            x = new XATransactionId(xid);
+        }
+
+        try {
+            this.connection.checkClosedOrFailed();
+            this.connection.ensureConnectionInfoSent();
+
+            // Let the server know that the tx is rollback.
+            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
+            this.connection.syncSendPacket(info);
+
+            List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+            if (l != null && !l.isEmpty()) {
+                for (TransactionContext ctx : l) {
+                    ctx.afterRollback();
+                }
+            }
+
+        } catch (JMSException e) {
+            throw toXAException(e);
+        }
+    }
+
+    // XAResource interface
+    public void commit(Xid xid, boolean onePhase) throws XAException {
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Commit: " + xid);
+        }
+        
+        // We allow interleaving multiple transactions, so
+        // we don't limit commit to the associated xid.
+        XATransactionId x;
+        if (xid == null || (equals(associatedXid, xid))) {
+            // should never happen, end(xid,TMSUCCESS) must have been previously
+            // called
+            throw new XAException(XAException.XAER_PROTO);
+        } else {
+            x = new XATransactionId(xid);
+        }
+
+        try {
+            this.connection.checkClosedOrFailed();
+            this.connection.ensureConnectionInfoSent();
+
+            // Notify the server that the tx was committed back
+            TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
+
+            this.connection.syncSendPacket(info);
+
+            List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+            if (l != null && !l.isEmpty()) {
+                for (TransactionContext ctx : l) {
+                    ctx.afterCommit();
+                }
+            }
+
+        } catch (JMSException e) {
+            throw toXAException(e);
+        }
+
+    }
+
+    public void forget(Xid xid) throws XAException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Forget: " + xid);
+        }
+        
+        // We allow interleaving multiple transactions, so
+        // we don't limit forget to the associated xid.
+        XATransactionId x;
+        if (xid == null) {
+            throw new XAException(XAException.XAER_PROTO);
+        }
+        if (equals(associatedXid, xid)) {
+            // TODO determine if this can happen... I think not.
+            x = (XATransactionId)transactionId;
+        } else {
+            x = new XATransactionId(xid);
+        }
+
+        TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
+
+        try {
+            // Tell the server to forget the transaction.
+            this.connection.syncSendPacket(info);
+        } catch (JMSException e) {
+            throw toXAException(e);
+        }
+    }
+
+    public boolean isSameRM(XAResource xaResource) throws XAException {
+        if (xaResource == null) {
+            return false;
+        }
+        if (!(xaResource instanceof TransactionContext)) {
+            return false;
+        }
+        TransactionContext xar = (TransactionContext)xaResource;
+        try {
+            return getResourceManagerId().equals(xar.getResourceManagerId());
+        } catch (Throwable e) {
+            throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
+        }
+    }
+
+    public Xid[] recover(int flag) throws XAException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Recover: " + flag);
+        }
+        
+        TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
+        try {
+            this.connection.checkClosedOrFailed();
+            this.connection.ensureConnectionInfoSent();
+
+            DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
+            DataStructure[] data = receipt.getData();
+            XATransactionId[] answer;
+            if (data instanceof XATransactionId[]) {
+                answer = (XATransactionId[])data;
+            } else {
+                answer = new XATransactionId[data.length];
+                System.arraycopy(data, 0, answer, 0, data.length);
+            }
+            return answer;
+        } catch (JMSException e) {
+            throw toXAException(e);
+        }
+    }
+
+    public int getTransactionTimeout() throws XAException {
+        return 0;
+    }
+
+    public boolean setTransactionTimeout(int seconds) throws XAException {
+        return false;
+    }
+
+    // ///////////////////////////////////////////////////////////
+    //
+    // Helper methods.
+    //
+    // ///////////////////////////////////////////////////////////
+    private String getResourceManagerId() throws JMSException {
+        return this.connection.getResourceManagerId();
+    }
+
+    private void setXid(Xid xid) throws XAException {
+
+        try {
+            this.connection.checkClosedOrFailed();
+            this.connection.ensureConnectionInfoSent();
+        } catch (JMSException e) {
+            throw toXAException(e);
+        }
+
+        if (xid != null) {
+            // associate
+            associatedXid = xid;
+            transactionId = new XATransactionId(xid);
+
+            TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.BEGIN);
+            try {
+                this.connection.asyncSendPacket(info);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Started XA transaction: " + transactionId);
+                }
+            } catch (JMSException e) {
+                throw toXAException(e);
+            }
+
+        } else {
+
+            if (transactionId != null) {
+                TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END);
+                try {
+                    this.connection.syncSendPacket(info);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Ended XA transaction: " + transactionId);
+                    }
+                } catch (JMSException e) {
+                    throw toXAException(e);
+                }
+
+                // Add our self to the list of contexts that are interested in
+                // post commit/rollback events.
+                List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
+                if (l == null) {
+                    l = new ArrayList<TransactionContext>(3);
+                    ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
+                    l.add(this);
+                } else if (!l.contains(this)) {
+                    l.add(this);
+                }
+            }
+
+            // dis-associate
+            associatedXid = null;
+            transactionId = null;
+        }
+    }
+
+    /**
+     * Converts a JMSException from the server to an XAException. if the
+     * JMSException contained a linked XAException that is returned instead.
+     * 
+     * @param e JMSException to convert
+     * @return XAException wrapping original exception or its message
+     */
+    private XAException toXAException(JMSException e) {
+        if (e.getCause() != null && e.getCause() instanceof XAException) {
+            XAException original = (XAException)e.getCause();
+            XAException xae = new XAException(original.getMessage());
+            xae.errorCode = original.errorCode;
+            xae.initCause(original);
+            return xae;
+        }
+
+        XAException xae = new XAException(e.getMessage());
+        xae.errorCode = XAException.XAER_RMFAIL;
+        xae.initCause(e);
+        return xae;
+    }
+
+    public ActiveMQConnection getConnection() {
+        return connection;
+    }
+
+    public void cleanup() {
+        associatedXid = null;
+        transactionId = null;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.EventObject;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.command.ConsumerId;
+
+/**
+ * An event when the number of consumers on a given destination changes.
+ * 
+ * @version $Revision: 564057 $
+ */
+public abstract class ConsumerEvent extends EventObject {
+    private static final long serialVersionUID = 2442156576867593780L;
+    private final Destination destination;
+    private final ConsumerId consumerId;
+    private final int consumerCount;
+
+    public ConsumerEvent(ConsumerEventSource source, Destination destination, ConsumerId consumerId, int consumerCount) {
+        super(source);
+        this.destination = destination;
+        this.consumerId = consumerId;
+        this.consumerCount = consumerCount;
+    }
+
+    public ConsumerEventSource getAdvisor() {
+        return (ConsumerEventSource) getSource();
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    /**
+     * Returns the current number of consumers active at the time this advisory was sent.
+     * 
+     * Note that this is not the number of consumers active when the consumer started consuming.
+     * It is usually more vital to know how many consumers there are now - rather than historically
+     * how many there were when a consumer started. So if you create a {@link ConsumerListener}
+     * after many consumers have started, you will receive a ConsumerEvent for each consumer. However the
+     * {@link #getConsumerCount()} method will always return the current active consumer count on each event.
+     */
+    public int getConsumerCount() {
+        return consumerCount;
+    }
+
+    public ConsumerId getConsumerId() {
+        return consumerId;
+    }
+
+    public abstract boolean isStarted();
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.Service;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An object which can be used to listen to the number of active consumers
+ * available on a given destination.
+ * 
+ * @version $Revision: 669263 $
+ */
+public class ConsumerEventSource implements Service, MessageListener {
+    private static final Log LOG = LogFactory.getLog(ConsumerEventSource.class);
+
+    private final Connection connection;
+    private final ActiveMQDestination destination;
+    private ConsumerListener listener;
+    private AtomicBoolean started = new AtomicBoolean(false);
+    private AtomicInteger consumerCount = new AtomicInteger();
+    private Session session;
+    private ActiveMQMessageConsumer consumer;
+
+    public ConsumerEventSource(Connection connection, Destination destination) throws JMSException {
+        this.connection = connection;
+        this.destination = ActiveMQDestination.transform(destination);
+    }
+
+    public void setConsumerListener(ConsumerListener listener) {
+        this.listener = listener;
+    }
+    
+    public String getConsumerId() {
+        return consumer != null ? consumer.getConsumerId().toString() : "NOT_SET";
+    }
+
+    public void start() throws Exception {
+        if (started.compareAndSet(false, true)) {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(destination);
+            consumer = (ActiveMQMessageConsumer) session.createConsumer(advisoryTopic);
+            consumer.setMessageListener(this);
+        }
+    }
+
+    public void stop() throws Exception {
+        if (started.compareAndSet(true, false)) {
+            if (session != null) {
+                session.close();
+            }
+        }
+    }
+
+    public void onMessage(Message message) {
+        if (message instanceof ActiveMQMessage) {
+            ActiveMQMessage activeMessage = (ActiveMQMessage)message;
+            Object command = activeMessage.getDataStructure();
+            int count = 0;
+            if (command instanceof ConsumerInfo) {
+                count = consumerCount.incrementAndGet();
+                count = extractConsumerCountFromMessage(message, count);
+                fireConsumerEvent(new ConsumerStartedEvent(this, destination, (ConsumerInfo)command, count));
+            } else if (command instanceof RemoveInfo) {
+                RemoveInfo removeInfo = (RemoveInfo)command;
+                if (removeInfo.isConsumerRemove()) {
+                    count = consumerCount.decrementAndGet();
+                    count = extractConsumerCountFromMessage(message, count);
+                    fireConsumerEvent(new ConsumerStoppedEvent(this, destination, (ConsumerId)removeInfo.getObjectId(), count));
+                }
+            } else {
+                LOG.warn("Unknown command: " + command);
+            }
+        } else {
+            LOG.warn("Unknown message type: " + message + ". Message ignored");
+        }
+    }
+
+    /**
+     * Lets rely by default on the broker telling us what the consumer count is
+     * as it can ensure that we are up to date at all times and have not
+     * received messages out of order etc.
+     */
+    protected int extractConsumerCountFromMessage(Message message, int count) {
+        try {
+            Object value = message.getObjectProperty("consumerCount");
+            if (value instanceof Number) {
+                Number n = (Number)value;
+                return n.intValue();
+            }
+            LOG.warn("No consumerCount header available on the message: " + message);
+        } catch (Exception e) {
+            LOG.warn("Failed to extract consumerCount from message: " + message + ".Reason: " + e, e);
+        }
+        return count;
+    }
+
+    protected void fireConsumerEvent(ConsumerEvent event) {
+        if (listener != null) {
+            listener.onConsumerEvent(event);
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerListener.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerListener.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+/**
+ * Listen to the changes in the number of active consumers available for a given destination.
+ * 
+ * @version $Revision: 564271 $
+ */
+public interface ConsumerListener {
+
+    void onConsumerEvent(ConsumerEvent event);
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStartedEvent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStartedEvent.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStartedEvent.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStartedEvent.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
+
+/**
+ * An event when a new consumer has started.
+ * 
+ * @version $Revision: 564271 $
+ */
+public class ConsumerStartedEvent extends ConsumerEvent {
+
+    private static final long serialVersionUID = 5088138839609391074L;
+
+    private final  transient ConsumerInfo consumerInfo;
+
+    public ConsumerStartedEvent(ConsumerEventSource source, ActiveMQDestination destination, ConsumerInfo consumerInfo, int count) {
+        super(source, destination, consumerInfo.getConsumerId(), count);
+        this.consumerInfo = consumerInfo;
+    }
+
+    public boolean isStarted() {
+        return true;
+    }
+
+    /**
+     * @return details of the subscription
+     */
+    public ConsumerInfo getConsumerInfo() {
+        return consumerInfo;
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStoppedEvent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStoppedEvent.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStoppedEvent.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStoppedEvent.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+
+/**
+ * An event generated when a consumer stops.
+ * 
+ * @version $Revision: 563921 $
+ */
+public class ConsumerStoppedEvent extends ConsumerEvent {
+
+    private static final long serialVersionUID = 5378835541037193206L;
+
+    public ConsumerStoppedEvent(ConsumerEventSource source, ActiveMQDestination destination, ConsumerId consumerId, int count) {
+        super(source, destination, consumerId, count);
+    }
+
+    public boolean isStarted() {
+        return false;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationEvent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationEvent.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationEvent.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationEvent.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.EventObject;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * An event caused when a destination is created or deleted
+ *
+ * @version $Revision: 634277 $
+ */
+public class DestinationEvent extends EventObject {
+    private static final long serialVersionUID = 2442156576867593780L;
+    private DestinationInfo destinationInfo;
+
+    public DestinationEvent(DestinationSource source, DestinationInfo destinationInfo) {
+        super(source);
+        this.destinationInfo = destinationInfo;
+    }
+
+    public ActiveMQDestination getDestination() {
+        return getDestinationInfo().getDestination();
+    }
+
+    public boolean isAddOperation() {
+        return getDestinationInfo().isAddOperation();
+    }
+
+    public long getTimeout() {
+        return getDestinationInfo().getTimeout();
+    }
+
+    public boolean isRemoveOperation() {
+        return getDestinationInfo().isRemoveOperation();
+    }
+
+    public DestinationInfo getDestinationInfo() {
+        return destinationInfo;
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationListener.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationListener.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+/**
+ * Listen to the changes in destinations being created or destroyed
+ *
+ * @version $Revision: 634277 $
+ */
+public interface DestinationListener {
+    void onDestinationEvent(DestinationEvent event);
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationSource.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationSource.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationSource.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A helper class which keeps track of the Destinations available in a broker and allows you to listen to them
+ * being created or deleted.
+ *
+ * @version $Revision: 681153 $
+ */
+public class DestinationSource implements MessageListener {
+    private static final Log LOG = LogFactory.getLog(ConsumerEventSource.class);
+    private AtomicBoolean started = new AtomicBoolean(false);
+    private final Connection connection;
+    private Session session;
+    private MessageConsumer queueConsumer;
+    private MessageConsumer topicConsumer;
+    private MessageConsumer tempTopicConsumer;
+    private MessageConsumer tempQueueConsumer;
+    private Set<ActiveMQQueue> queues = new CopyOnWriteArraySet<ActiveMQQueue>();
+    private Set<ActiveMQTopic> topics = new CopyOnWriteArraySet<ActiveMQTopic>();
+    private Set<ActiveMQTempQueue> temporaryQueues = new CopyOnWriteArraySet<ActiveMQTempQueue>();
+    private Set<ActiveMQTempTopic> temporaryTopics = new CopyOnWriteArraySet<ActiveMQTempTopic>();
+    private DestinationListener listener;
+
+    public DestinationSource(Connection connection) throws JMSException {
+        this.connection = connection;
+    }
+
+    public DestinationListener getListener() {
+        return listener;
+    }
+
+    public void setDestinationListener(DestinationListener listener) {
+        this.listener = listener;
+    }
+
+    /**
+     * Returns the current queues available on the broker
+     */
+    public Set<ActiveMQQueue> getQueues() {
+        return queues;
+    }
+
+    /**
+     * Returns the current topics on the broker
+     */
+    public Set<ActiveMQTopic> getTopics() {
+        return topics;
+    }
+
+    /**
+     * Returns the current temporary topics available on the broker
+     */
+    public Set<ActiveMQTempQueue> getTemporaryQueues() {
+        return temporaryQueues;
+    }
+
+    /**
+     * Returns the current temporary queues available on the broker
+     */
+    public Set<ActiveMQTempTopic> getTemporaryTopics() {
+        return temporaryTopics;
+    }
+
+    public void start() throws JMSException {
+        if (started.compareAndSet(false, true)) {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            queueConsumer = session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC);
+            queueConsumer.setMessageListener(this);
+
+            topicConsumer = session.createConsumer(AdvisorySupport.TOPIC_ADVISORY_TOPIC);
+            topicConsumer.setMessageListener(this);
+
+            tempQueueConsumer = session.createConsumer(AdvisorySupport.TEMP_QUEUE_ADVISORY_TOPIC);
+            tempQueueConsumer.setMessageListener(this);
+
+            tempTopicConsumer = session.createConsumer(AdvisorySupport.TEMP_TOPIC_ADVISORY_TOPIC);
+            tempTopicConsumer.setMessageListener(this);
+        }
+    }
+
+    public void stop() throws JMSException {
+        if (started.compareAndSet(true, false)) {
+            if (session != null) {
+                session.close();
+            }
+        }
+    }
+
+    public void onMessage(Message message) {
+        if (message instanceof ActiveMQMessage) {
+            ActiveMQMessage activeMessage = (ActiveMQMessage) message;
+            Object command = activeMessage.getDataStructure();
+            if (command instanceof DestinationInfo) {
+                DestinationInfo destinationInfo = (DestinationInfo) command;
+                DestinationEvent event = new DestinationEvent(this, destinationInfo);
+                fireDestinationEvent(event);
+            }
+            else {
+                LOG.warn("Unknown dataStructure: " + command);
+            }
+        }
+        else {
+            LOG.warn("Unknown message type: " + message + ". Message ignored");
+        }
+    }
+
+    protected void fireDestinationEvent(DestinationEvent event) {
+        // now lets update the data structures
+        ActiveMQDestination destination = event.getDestination();
+        boolean add = event.isAddOperation();
+        if (destination instanceof ActiveMQQueue) {
+            ActiveMQQueue queue = (ActiveMQQueue) destination;
+            if (add) {
+                queues.add(queue);
+            }
+            else {
+                queues.remove(queue);
+            }
+        }
+        else if (destination instanceof ActiveMQTopic) {
+            ActiveMQTopic topic = (ActiveMQTopic) destination;
+            if (add) {
+                topics.add(topic);
+            }
+            else {
+                topics.remove(topic);
+            }
+        }
+        else if (destination instanceof ActiveMQTempQueue) {
+            ActiveMQTempQueue queue = (ActiveMQTempQueue) destination;
+            if (add) {
+                temporaryQueues.add(queue);
+            }
+            else {
+                temporaryQueues.remove(queue);
+            }
+        }
+        else if (destination instanceof ActiveMQTempTopic) {
+            ActiveMQTempTopic topic = (ActiveMQTempTopic) destination;
+            if (add) {
+                temporaryTopics.add(topic);
+            }
+            else {
+                temporaryTopics.remove(topic);
+            }
+        }
+        else {
+            LOG.warn("Unknown destination type: " + destination);
+        }
+        if (listener != null) {
+            listener.onDestinationEvent(event);
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEvent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEvent.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEvent.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEvent.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.EventObject;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.command.ProducerId;
+
+/**
+ * An event when the number of producers on a given destination changes.
+ * 
+ * @version $Revision: 359679 $
+ */
+public abstract class ProducerEvent extends EventObject {
+    private static final long serialVersionUID = 2442156576867593780L;
+    private final Destination destination;
+    private final ProducerId producerId;
+    private final int producerCount;
+
+    public ProducerEvent(ProducerEventSource source, Destination destination, ProducerId producerId, int producerCount) {
+        super(source);
+        this.destination = destination;
+        this.producerId = producerId;
+        this.producerCount = producerCount;
+    }
+
+    public ProducerEventSource getAdvisor() {
+        return (ProducerEventSource) getSource();
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    /**
+     * Returns the current number of producers active at the time this advisory was sent.
+     * 
+     */
+    public int getProducerCount() {
+        return producerCount;
+    }
+
+    public ProducerId getProducerId() {
+        return producerId;
+    }
+
+    public abstract boolean isStarted();
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An object which can be used to listen to the number of active consumers
+ * available on a given destination.
+ * 
+ * @version $Revision: 359679 $
+ */
+public class ProducerEventSource implements Service, MessageListener {
+    private static final Log LOG = LogFactory.getLog(ProducerEventSource.class);
+
+    private final Connection connection;
+    private final ActiveMQDestination destination;
+    private ProducerListener listener;
+    private AtomicBoolean started = new AtomicBoolean(false);
+    private AtomicInteger producerCount = new AtomicInteger();
+    private Session session;
+    private MessageConsumer consumer;
+
+    public ProducerEventSource(Connection connection, Destination destination) throws JMSException {
+        this.connection = connection;
+        this.destination = ActiveMQDestination.transform(destination);
+    }
+
+    public void setProducerListener(ProducerListener listener) {
+        this.listener = listener;
+    }
+
+    public void start() throws Exception {
+        if (started.compareAndSet(false, true)) {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            ActiveMQTopic advisoryTopic = AdvisorySupport.getProducerAdvisoryTopic(destination);
+            consumer = session.createConsumer(advisoryTopic);
+            consumer.setMessageListener(this);
+        }
+    }
+
+    public void stop() throws Exception {
+        if (started.compareAndSet(true, false)) {
+            if (session != null) {
+                session.close();
+            }
+        }
+    }
+
+    public void onMessage(Message message) {
+        if (message instanceof ActiveMQMessage) {
+            ActiveMQMessage activeMessage = (ActiveMQMessage)message;
+            Object command = activeMessage.getDataStructure();
+            int count = 0;
+            if (command instanceof ProducerInfo) {
+                count = producerCount.incrementAndGet();
+                count = extractProducerCountFromMessage(message, count);
+                fireProducerEvent(new ProducerStartedEvent(this, destination, (ProducerInfo)command, count));
+            } else if (command instanceof RemoveInfo) {
+                RemoveInfo removeInfo = (RemoveInfo)command;
+                if (removeInfo.isProducerRemove()) {
+                    count = producerCount.decrementAndGet();
+                    count = extractProducerCountFromMessage(message, count);
+                    fireProducerEvent(new ProducerStoppedEvent(this, destination, (ProducerId)removeInfo.getObjectId(), count));
+                }
+            } else {
+                LOG.warn("Unknown command: " + command);
+            }
+        } else {
+            LOG.warn("Unknown message type: " + message + ". Message ignored");
+        }
+    }
+
+    protected int extractProducerCountFromMessage(Message message, int count) {
+        try {
+            Object value = message.getObjectProperty("producerCount");
+            if (value instanceof Number) {
+                Number n = (Number)value;
+                return n.intValue();
+            }
+            LOG.warn("No producerCount header available on the message: " + message);
+        } catch (Exception e) {
+            LOG.warn("Failed to extract producerCount from message: " + message + ".Reason: " + e, e);
+        }
+        return count;
+    }
+
+    protected void fireProducerEvent(ProducerEvent event) {
+        if (listener != null) {
+            listener.onProducerEvent(event);
+        }
+    }
+
+}