You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/02 23:29:35 UTC
svn commit: r781177 [9/11] - in /activemq/sandbox/activemq-flow:
activemq-bio/ activemq-bio/src/main/java/org/
activemq-bio/src/main/java/org/apache/
activemq-bio/src/main/java/org/apache/activemq/
activemq-bio/src/main/java/org/apache/activemq/transpo...
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableListener.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableListener.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.MessageConsumer;
+
+/**
+ * A listener which is notified if a message is available for processing via the
+ * receive methods. Typically on receiving this notification you can call
+ * {@link MessageConsumer#receiveNoWait()} to get the new message immediately.
+ *
+ * Note that this notification just indicates a message is available for synchronous consumption,
+ * it does not actually consume the message.
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface MessageAvailableListener {
+
+ void onMessageAvailable(MessageConsumer consumer);
+
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.MessageDispatch;
+
+public class MessageDispatchChannel {
+
+ private final Object mutex = new Object();
+ private final LinkedList<MessageDispatch> list;
+ private boolean closed;
+ private boolean running;
+
+ public MessageDispatchChannel() {
+ this.list = new LinkedList<MessageDispatch>();
+ }
+
+ public void enqueue(MessageDispatch message) {
+ synchronized (mutex) {
+ list.addLast(message);
+ mutex.notify();
+ }
+ }
+
+ public void enqueueFirst(MessageDispatch message) {
+ synchronized (mutex) {
+ list.addFirst(message);
+ mutex.notify();
+ }
+ }
+
+ public boolean isEmpty() {
+ synchronized (mutex) {
+ return list.isEmpty();
+ }
+ }
+
+ /**
+ * Used to get an enqueued message. The amount of time this method blocks is
+ * based on the timeout value. - if timeout==-1 then it blocks until a
+ * message is received. - if timeout==0 then it it tries to not block at
+ * all, it returns a message if it is available - if timeout>0 then it
+ * blocks up to timeout amount of time. Expired messages will consumed by
+ * this method.
+ *
+ * @throws JMSException
+ * @return null if we timeout or if the consumer is closed.
+ * @throws InterruptedException
+ */
+ public MessageDispatch dequeue(long timeout) throws InterruptedException {
+ synchronized (mutex) {
+ // Wait until the consumer is ready to deliver messages.
+ while (timeout != 0 && !closed && (list.isEmpty() || !running)) {
+ if (timeout == -1) {
+ mutex.wait();
+ } else {
+ mutex.wait(timeout);
+ break;
+ }
+ }
+ if (closed || !running || list.isEmpty()) {
+ return null;
+ }
+ return list.removeFirst();
+ }
+ }
+
+ public MessageDispatch dequeueNoWait() {
+ synchronized (mutex) {
+ if (closed || !running || list.isEmpty()) {
+ return null;
+ }
+ return list.removeFirst();
+ }
+ }
+
+ public MessageDispatch peek() {
+ synchronized (mutex) {
+ if (closed || !running || list.isEmpty()) {
+ return null;
+ }
+ return list.getFirst();
+ }
+ }
+
+ public void start() {
+ synchronized (mutex) {
+ running = true;
+ mutex.notifyAll();
+ }
+ }
+
+ public void stop() {
+ synchronized (mutex) {
+ running = false;
+ mutex.notifyAll();
+ }
+ }
+
+ public void close() {
+ synchronized (mutex) {
+ if (!closed) {
+ running = false;
+ closed = true;
+ }
+ mutex.notifyAll();
+ }
+ }
+
+ public void clear() {
+ synchronized (mutex) {
+ list.clear();
+ }
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public int size() {
+ synchronized (mutex) {
+ return list.size();
+ }
+ }
+
+ public Object getMutex() {
+ return mutex;
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ public List<MessageDispatch> removeAll() {
+ synchronized (mutex) {
+ ArrayList<MessageDispatch> rc = new ArrayList<MessageDispatch>(list);
+ list.clear();
+ return rc;
+ }
+ }
+
+ public String toString() {
+ synchronized (mutex) {
+ return list.toString();
+ }
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformer.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformer.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformer.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * A plugin strategy for transforming a message before it is sent by the JMS client or before it is
+ * dispatched to the JMS consumer
+ *
+ * @version $Revision: 564271 $
+ */
+public interface MessageTransformer {
+
+ /**
+ * Transforms the given message inside the producer before it is sent to the JMS bus.
+ */
+ Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException;
+
+ /**
+ * Transforms the given message inside the consumer before being dispatched to the client code
+ */
+ Message consumerTransform(Session session, MessageConsumer consumer, Message message)throws JMSException;
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformerSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformerSupport.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformerSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformerSupport.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+/**
+ * A useful base class for message transformers.
+ *
+ * @version $Revision: 563921 $
+ */
+public abstract class MessageTransformerSupport implements MessageTransformer {
+
+ /**
+ * Copies the standard JMS and user defined properties from the givem message to the specified message
+ *
+ * @param fromMessage the message to take the properties from
+ * @param toMesage the message to add the properties to
+ * @throws JMSException
+ */
+ protected void copyProperties(Message fromMessage, Message toMesage) throws JMSException {
+ ActiveMQMessageTransformation.copyProperties(fromMessage, toMesage);
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.IllegalStateException;
+
+/**
+ * An exception thrown when an operation is invoked on a service
+ * which has not yet been started.
+ *
+ * @version $Revision: 1.2 $
+ */
+public class NotStartedException extends IllegalStateException {
+
+ private static final long serialVersionUID = -4907909323529887659L;
+
+ public NotStartedException() {
+ super("IllegalState: This service has not yet been started", "AMQ-1003");
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.Serializable;
+import java.util.Random;
+
+/**
+ * Configuration options used to control how messages are re-delivered when they
+ * are rolled back.
+ *
+ * @org.apache.xbean.XBean element="redeliveryPolicy"
+ * @version $Revision: 1.11 $
+ */
+public class RedeliveryPolicy implements Cloneable, Serializable {
+
+ public static final int NO_MAXIMUM_REDELIVERIES = -1;
+ private static Random randomNumberGenerator;
+
+ // +/-15% for a 30% spread -cgs
+ private double collisionAvoidanceFactor = 0.15d;
+ private int maximumRedeliveries = 6;
+ private long initialRedeliveryDelay = 1000L;
+ private boolean useCollisionAvoidance;
+ private boolean useExponentialBackOff;
+ private short backOffMultiplier = 5;
+
+ public RedeliveryPolicy() {
+ }
+
+ public RedeliveryPolicy copy() {
+ try {
+ return (RedeliveryPolicy)clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Could not clone: " + e, e);
+ }
+ }
+
+ public short getBackOffMultiplier() {
+ return backOffMultiplier;
+ }
+
+ public void setBackOffMultiplier(short backOffMultiplier) {
+ this.backOffMultiplier = backOffMultiplier;
+ }
+
+ public short getCollisionAvoidancePercent() {
+ return (short)Math.round(collisionAvoidanceFactor * 100);
+ }
+
+ public void setCollisionAvoidancePercent(short collisionAvoidancePercent) {
+ this.collisionAvoidanceFactor = collisionAvoidancePercent * 0.01d;
+ }
+
+ public long getInitialRedeliveryDelay() {
+ return initialRedeliveryDelay;
+ }
+
+ public void setInitialRedeliveryDelay(long initialRedeliveryDelay) {
+ this.initialRedeliveryDelay = initialRedeliveryDelay;
+ }
+
+ public int getMaximumRedeliveries() {
+ return maximumRedeliveries;
+ }
+
+ public void setMaximumRedeliveries(int maximumRedeliveries) {
+ this.maximumRedeliveries = maximumRedeliveries;
+ }
+
+ public long getRedeliveryDelay(long previousDelay) {
+ long redeliveryDelay;
+
+ if (previousDelay == 0) {
+ redeliveryDelay = initialRedeliveryDelay;
+ } else if (useExponentialBackOff && backOffMultiplier > 1) {
+ redeliveryDelay = previousDelay * backOffMultiplier;
+ } else {
+ redeliveryDelay = previousDelay;
+ }
+
+ if (useCollisionAvoidance) {
+ /*
+ * First random determines +/-, second random determines how far to
+ * go in that direction. -cgs
+ */
+ Random random = getRandomNumberGenerator();
+ double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble();
+ redeliveryDelay += redeliveryDelay * variance;
+ }
+
+ return redeliveryDelay;
+ }
+
+ public boolean isUseCollisionAvoidance() {
+ return useCollisionAvoidance;
+ }
+
+ public void setUseCollisionAvoidance(boolean useCollisionAvoidance) {
+ this.useCollisionAvoidance = useCollisionAvoidance;
+ }
+
+ public boolean isUseExponentialBackOff() {
+ return useExponentialBackOff;
+ }
+
+ public void setUseExponentialBackOff(boolean useExponentialBackOff) {
+ this.useExponentialBackOff = useExponentialBackOff;
+ }
+
+ protected static synchronized Random getRandomNumberGenerator() {
+ if (randomNumberGenerator == null) {
+ randomNumberGenerator = new Random();
+ }
+ return randomNumberGenerator;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Service.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Service.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Service.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Service.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+
+/**
+ * The core lifecyle interface for ActiveMQ components.
+ *
+ * If there was a standard way to do so, it'd be good to register this
+ * interface with Spring so it treats the start/stop methods as those of
+ * {@link org.springframework.beans.factory.InitializingBean}
+ * and {@link org.springframework.beans.factory.DisposableBean}
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface Service {
+
+ void start() throws Exception;
+
+ void stop() throws Exception;
+
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Topic;
+
+/**
+ * The StreamConnection interface allows you to send and receive data from a
+ * Destination in using standard java InputStream and OutputStream objects. It's
+ * best use case is to send and receive large amounts of data that would be to
+ * large to hold in a single JMS message.
+ *
+ * @version $Revision$
+ */
+public interface StreamConnection extends Connection {
+
+ InputStream createInputStream(Destination dest) throws JMSException;
+
+ InputStream createInputStream(Destination dest, String messageSelector) throws JMSException;
+
+ InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException;
+
+ InputStream createDurableInputStream(Topic dest, String name) throws JMSException;
+
+ InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException;
+
+ InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException;
+
+ OutputStream createOutputStream(Destination dest) throws JMSException;
+
+ OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException;
+
+ /**
+ * Unsubscribes a durable subscription that has been created by a client.
+ * <P>
+ * This method deletes the state being maintained on behalf of the
+ * subscriber by its provider.
+ * <P>
+ * It is erroneous for a client to delete a durable subscription while there
+ * is an active <CODE>MessageConsumer </CODE> or
+ * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
+ * message is part of a pending transaction or has not been acknowledged in
+ * the session.
+ *
+ * @param name the name used to identify this subscription
+ * @throws JMSException if the session fails to unsubscribe to the durable
+ * subscription due to some internal error.
+ * @throws InvalidDestinationException if an invalid subscription name is
+ * specified.
+ * @since 1.1
+ */
+ void unsubscribe(String name) throws JMSException;
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+
+/**
+* A holder for different thread priorites used in ActiveMQ
+*
+* @version $Revision: 1.9 $
+*/
+
+public interface ThreadPriorities {
+ int INBOUND_BROKER_CONNECTION = 6;
+ int OUT_BOUND_BROKER_DISPATCH = 6;
+ int INBOUND_CLIENT_CONNECTION = 7;
+ int INBOUND_CLIENT_SESSION = 7;
+ int BROKER_MANAGEMENT = 9;
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,659 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.JMSException;
+import javax.jms.TransactionInProgressException;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.DataArrayResponse;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.IntegerResponse;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A TransactionContext provides the means to control a JMS transaction. It
+ * provides a local transaction interface and also an XAResource interface. <p/>
+ * An application server controls the transactional assignment of an XASession
+ * by obtaining its XAResource. It uses the XAResource to assign the session to
+ * a transaction, prepare and commit work on the transaction, and so on. <p/> An
+ * XAResource provides some fairly sophisticated facilities for interleaving
+ * work on multiple transactions, recovering a list of transactions in progress,
+ * and so on. A JTA aware JMS provider must fully implement this functionality.
+ * This could be done by using the services of a database that supports XA, or a
+ * JMS provider may choose to implement this functionality from scratch. <p/>
+ *
+ * @version $Revision: 1.10 $
+ * @see javax.jms.Session
+ * @see javax.jms.QueueSession
+ * @see javax.jms.TopicSession
+ * @see javax.jms.XASession
+ */
+public class TransactionContext implements XAResource {
+
+ private static final Log LOG = LogFactory.getLog(TransactionContext.class);
+
+ // XATransactionId -> ArrayList of TransactionContext objects
+ private final ConcurrentHashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = new ConcurrentHashMap<TransactionId, List<TransactionContext>>();
+
+ private final ActiveMQConnection connection;
+ private final LongSequenceGenerator localTransactionIdGenerator;
+ private final ConnectionId connectionId;
+ private List<Synchronization> synchronizations;
+
+ // To track XA transactions.
+ private Xid associatedXid;
+ private TransactionId transactionId;
+ private LocalTransactionEventListener localTransactionEventListener;
+
+ public TransactionContext(ActiveMQConnection connection) {
+ this.connection = connection;
+ this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
+ this.connectionId = connection.getConnectionInfo().getConnectionId();
+ }
+
+ public boolean isInXATransaction() {
+ return transactionId != null && transactionId.isXATransaction();
+ }
+
+ public boolean isInLocalTransaction() {
+ return transactionId != null && transactionId.isLocalTransaction();
+ }
+
+ public boolean isInTransaction() {
+ return transactionId != null;
+ }
+
+ /**
+ * @return Returns the localTransactionEventListener.
+ */
+ public LocalTransactionEventListener getLocalTransactionEventListener() {
+ return localTransactionEventListener;
+ }
+
+ /**
+ * Used by the resource adapter to listen to transaction events.
+ *
+ * @param localTransactionEventListener The localTransactionEventListener to
+ * set.
+ */
+ public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
+ this.localTransactionEventListener = localTransactionEventListener;
+ }
+
+ // ///////////////////////////////////////////////////////////
+ //
+ // Methods that work with the Synchronization objects registered with
+ // the transaction.
+ //
+ // ///////////////////////////////////////////////////////////
+
+ public void addSynchronization(Synchronization s) {
+ if (synchronizations == null) {
+ synchronizations = new ArrayList<Synchronization>(10);
+ }
+ synchronizations.add(s);
+ }
+
+ private void afterRollback() throws JMSException {
+ if (synchronizations == null) {
+ return;
+ }
+
+ int size = synchronizations.size();
+ try {
+ for (int i = 0; i < size; i++) {
+ synchronizations.get(i).afterRollback();
+ }
+ } catch (JMSException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw JMSExceptionSupport.create(e);
+ } finally {
+ synchronizations = null;
+ }
+ }
+
+ private void afterCommit() throws JMSException {
+ if (synchronizations == null) {
+ return;
+ }
+
+ int size = synchronizations.size();
+ try {
+ for (int i = 0; i < size; i++) {
+ synchronizations.get(i).afterCommit();
+ }
+ } catch (JMSException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw JMSExceptionSupport.create(e);
+ } finally {
+ synchronizations = null;
+ }
+ }
+
+ private void beforeEnd() throws JMSException {
+ if (synchronizations == null) {
+ return;
+ }
+
+ int size = synchronizations.size();
+ try {
+ for (int i = 0; i < size; i++) {
+ synchronizations.get(i).beforeEnd();
+ }
+ } catch (JMSException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+
+ public TransactionId getTransactionId() {
+ return transactionId;
+ }
+
+ // ///////////////////////////////////////////////////////////
+ //
+ // Local transaction interface.
+ //
+ // ///////////////////////////////////////////////////////////
+
+ /**
+ * Start a local transaction.
+ * @throws javax.jms.JMSException on internal error
+ */
+ public void begin() throws JMSException {
+
+ if (isInXATransaction()) {
+ throw new TransactionInProgressException("Cannot start local transaction. XA transaction is already in progress.");
+ }
+
+ if (transactionId == null) {
+ synchronizations = null;
+ this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId());
+ TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
+ this.connection.ensureConnectionInfoSent();
+ this.connection.asyncSendPacket(info);
+
+ // Notify the listener that the tx was started.
+ if (localTransactionEventListener != null) {
+ localTransactionEventListener.beginEvent();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Begin:" + transactionId);
+ }
+ }
+
+ }
+
+ /**
+ * Rolls back any work done in this transaction and releases any locks
+ * currently held.
+ *
+ * @throws JMSException if the JMS provider fails to roll back the
+ * transaction due to some internal error.
+ * @throws javax.jms.IllegalStateException if the method is not called by a
+ * transacted session.
+ */
+ public void rollback() throws JMSException {
+ if (isInXATransaction()) {
+ throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
+ }
+
+ beforeEnd();
+ if (transactionId != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Rollback: " + transactionId
+ + " syncCount: "
+ + (synchronizations != null ? synchronizations.size() : 0));
+ }
+
+ TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
+ this.transactionId = null;
+ this.connection.asyncSendPacket(info);
+ // Notify the listener that the tx was rolled back
+ if (localTransactionEventListener != null) {
+ localTransactionEventListener.rollbackEvent();
+ }
+ }
+
+ afterRollback();
+ }
+
+ /**
+ * Commits all work done in this transaction and releases any locks
+ * currently held.
+ *
+ * @throws JMSException if the JMS provider fails to commit the transaction
+ * due to some internal error.
+ * @throws javax.jms.IllegalStateException if the method is not called by a
+ * transacted session.
+ */
+ public void commit() throws JMSException {
+ if (isInXATransaction()) {
+ throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
+ }
+
+ beforeEnd();
+
+ // Only send commit if the transaction was started.
+ if (transactionId != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Commit: " + transactionId
+ + " syncCount: "
+ + (synchronizations != null ? synchronizations.size() : 0));
+ }
+
+ TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
+ this.transactionId = null;
+ // Notify the listener that the tx was committed back
+ this.connection.syncSendPacket(info);
+ if (localTransactionEventListener != null) {
+ localTransactionEventListener.commitEvent();
+ }
+ afterCommit();
+ }
+ }
+
+ // ///////////////////////////////////////////////////////////
+ //
+ // XAResource Implementation
+ //
+ // ///////////////////////////////////////////////////////////
+ /**
+ * Associates a transaction with the resource.
+ */
+ public void start(Xid xid, int flags) throws XAException {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Start: " + xid);
+ }
+ if (isInLocalTransaction()) {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ // Are we already associated?
+ if (associatedXid != null) {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+
+ // if ((flags & TMJOIN) == TMJOIN) {
+ // // TODO: verify that the server has seen the xid
+ // }
+ // if ((flags & TMJOIN) == TMRESUME) {
+ // // TODO: verify that the xid was suspended.
+ // }
+
+ // associate
+ synchronizations = null;
+ setXid(xid);
+ }
+
+ /**
+ * @return connectionId for connection
+ */
+ private ConnectionId getConnectionId() {
+ return connection.getConnectionInfo().getConnectionId();
+ }
+
+ public void end(Xid xid, int flags) throws XAException {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("End: " + xid);
+ }
+
+ if (isInLocalTransaction()) {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+
+ if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
+ // You can only suspend the associated xid.
+ if (!equals(associatedXid, xid)) {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+
+ // TODO: we may want to put the xid in a suspended list.
+ try {
+ beforeEnd();
+ } catch (JMSException e) {
+ throw toXAException(e);
+ }
+ setXid(null);
+ } else if ((flags & TMSUCCESS) == TMSUCCESS) {
+ // set to null if this is the current xid.
+ // otherwise this could be an asynchronous success call
+ if (equals(associatedXid, xid)) {
+ try {
+ beforeEnd();
+ } catch (JMSException e) {
+ throw toXAException(e);
+ }
+ setXid(null);
+ }
+ } else {
+ throw new XAException(XAException.XAER_INVAL);
+ }
+ }
+
+ private boolean equals(Xid xid1, Xid xid2) {
+ if (xid1 == xid2) {
+ return true;
+ }
+ if (xid1 == null ^ xid2 == null) {
+ return false;
+ }
+ return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())
+ && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
+ }
+
+ public int prepare(Xid xid) throws XAException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Prepare: " + xid);
+ }
+
+ // We allow interleaving multiple transactions, so
+ // we don't limit prepare to the associated xid.
+ XATransactionId x;
+ // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
+ // called first
+ if (xid == null || (equals(associatedXid, xid))) {
+ throw new XAException(XAException.XAER_PROTO);
+ } else {
+ // TODO: cache the known xids so we don't keep recreating this one??
+ x = new XATransactionId(xid);
+ }
+
+ try {
+ TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
+
+ // Find out if the server wants to commit or rollback.
+ IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info);
+ return response.getResult();
+
+ } catch (JMSException e) {
+ throw toXAException(e);
+ }
+ }
+
+ public void rollback(Xid xid) throws XAException {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Rollback: " + xid);
+ }
+
+ // We allow interleaving multiple transactions, so
+ // we don't limit rollback to the associated xid.
+ XATransactionId x;
+ if (xid == null) {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ if (equals(associatedXid, xid)) {
+ // I think this can happen even without an end(xid) call. Need to
+ // check spec.
+ x = (XATransactionId)transactionId;
+ } else {
+ x = new XATransactionId(xid);
+ }
+
+ try {
+ this.connection.checkClosedOrFailed();
+ this.connection.ensureConnectionInfoSent();
+
+ // Let the server know that the tx is rollback.
+ TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
+ this.connection.syncSendPacket(info);
+
+ List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+ if (l != null && !l.isEmpty()) {
+ for (TransactionContext ctx : l) {
+ ctx.afterRollback();
+ }
+ }
+
+ } catch (JMSException e) {
+ throw toXAException(e);
+ }
+ }
+
+ // XAResource interface
+ public void commit(Xid xid, boolean onePhase) throws XAException {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Commit: " + xid);
+ }
+
+ // We allow interleaving multiple transactions, so
+ // we don't limit commit to the associated xid.
+ XATransactionId x;
+ if (xid == null || (equals(associatedXid, xid))) {
+ // should never happen, end(xid,TMSUCCESS) must have been previously
+ // called
+ throw new XAException(XAException.XAER_PROTO);
+ } else {
+ x = new XATransactionId(xid);
+ }
+
+ try {
+ this.connection.checkClosedOrFailed();
+ this.connection.ensureConnectionInfoSent();
+
+ // Notify the server that the tx was committed back
+ TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
+
+ this.connection.syncSendPacket(info);
+
+ List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+ if (l != null && !l.isEmpty()) {
+ for (TransactionContext ctx : l) {
+ ctx.afterCommit();
+ }
+ }
+
+ } catch (JMSException e) {
+ throw toXAException(e);
+ }
+
+ }
+
+ public void forget(Xid xid) throws XAException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Forget: " + xid);
+ }
+
+ // We allow interleaving multiple transactions, so
+ // we don't limit forget to the associated xid.
+ XATransactionId x;
+ if (xid == null) {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ if (equals(associatedXid, xid)) {
+ // TODO determine if this can happen... I think not.
+ x = (XATransactionId)transactionId;
+ } else {
+ x = new XATransactionId(xid);
+ }
+
+ TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
+
+ try {
+ // Tell the server to forget the transaction.
+ this.connection.syncSendPacket(info);
+ } catch (JMSException e) {
+ throw toXAException(e);
+ }
+ }
+
+ public boolean isSameRM(XAResource xaResource) throws XAException {
+ if (xaResource == null) {
+ return false;
+ }
+ if (!(xaResource instanceof TransactionContext)) {
+ return false;
+ }
+ TransactionContext xar = (TransactionContext)xaResource;
+ try {
+ return getResourceManagerId().equals(xar.getResourceManagerId());
+ } catch (Throwable e) {
+ throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
+ }
+ }
+
+ public Xid[] recover(int flag) throws XAException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recover: " + flag);
+ }
+
+ TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
+ try {
+ this.connection.checkClosedOrFailed();
+ this.connection.ensureConnectionInfoSent();
+
+ DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
+ DataStructure[] data = receipt.getData();
+ XATransactionId[] answer;
+ if (data instanceof XATransactionId[]) {
+ answer = (XATransactionId[])data;
+ } else {
+ answer = new XATransactionId[data.length];
+ System.arraycopy(data, 0, answer, 0, data.length);
+ }
+ return answer;
+ } catch (JMSException e) {
+ throw toXAException(e);
+ }
+ }
+
+ public int getTransactionTimeout() throws XAException {
+ return 0;
+ }
+
+ public boolean setTransactionTimeout(int seconds) throws XAException {
+ return false;
+ }
+
+ // ///////////////////////////////////////////////////////////
+ //
+ // Helper methods.
+ //
+ // ///////////////////////////////////////////////////////////
+ private String getResourceManagerId() throws JMSException {
+ return this.connection.getResourceManagerId();
+ }
+
+ private void setXid(Xid xid) throws XAException {
+
+ try {
+ this.connection.checkClosedOrFailed();
+ this.connection.ensureConnectionInfoSent();
+ } catch (JMSException e) {
+ throw toXAException(e);
+ }
+
+ if (xid != null) {
+ // associate
+ associatedXid = xid;
+ transactionId = new XATransactionId(xid);
+
+ TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.BEGIN);
+ try {
+ this.connection.asyncSendPacket(info);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Started XA transaction: " + transactionId);
+ }
+ } catch (JMSException e) {
+ throw toXAException(e);
+ }
+
+ } else {
+
+ if (transactionId != null) {
+ TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END);
+ try {
+ this.connection.syncSendPacket(info);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ended XA transaction: " + transactionId);
+ }
+ } catch (JMSException e) {
+ throw toXAException(e);
+ }
+
+ // Add our self to the list of contexts that are interested in
+ // post commit/rollback events.
+ List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
+ if (l == null) {
+ l = new ArrayList<TransactionContext>(3);
+ ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
+ l.add(this);
+ } else if (!l.contains(this)) {
+ l.add(this);
+ }
+ }
+
+ // dis-associate
+ associatedXid = null;
+ transactionId = null;
+ }
+ }
+
+ /**
+ * Converts a JMSException from the server to an XAException. if the
+ * JMSException contained a linked XAException that is returned instead.
+ *
+ * @param e JMSException to convert
+ * @return XAException wrapping original exception or its message
+ */
+ private XAException toXAException(JMSException e) {
+ if (e.getCause() != null && e.getCause() instanceof XAException) {
+ XAException original = (XAException)e.getCause();
+ XAException xae = new XAException(original.getMessage());
+ xae.errorCode = original.errorCode;
+ xae.initCause(original);
+ return xae;
+ }
+
+ XAException xae = new XAException(e.getMessage());
+ xae.errorCode = XAException.XAER_RMFAIL;
+ xae.initCause(e);
+ return xae;
+ }
+
+ public ActiveMQConnection getConnection() {
+ return connection;
+ }
+
+ public void cleanup() {
+ associatedXid = null;
+ transactionId = null;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.EventObject;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.command.ConsumerId;
+
+/**
+ * An event when the number of consumers on a given destination changes.
+ *
+ * @version $Revision: 564057 $
+ */
+public abstract class ConsumerEvent extends EventObject {
+ private static final long serialVersionUID = 2442156576867593780L;
+ private final Destination destination;
+ private final ConsumerId consumerId;
+ private final int consumerCount;
+
+ public ConsumerEvent(ConsumerEventSource source, Destination destination, ConsumerId consumerId, int consumerCount) {
+ super(source);
+ this.destination = destination;
+ this.consumerId = consumerId;
+ this.consumerCount = consumerCount;
+ }
+
+ public ConsumerEventSource getAdvisor() {
+ return (ConsumerEventSource) getSource();
+ }
+
+ public Destination getDestination() {
+ return destination;
+ }
+
+ /**
+ * Returns the current number of consumers active at the time this advisory was sent.
+ *
+ * Note that this is not the number of consumers active when the consumer started consuming.
+ * It is usually more vital to know how many consumers there are now - rather than historically
+ * how many there were when a consumer started. So if you create a {@link ConsumerListener}
+ * after many consumers have started, you will receive a ConsumerEvent for each consumer. However the
+ * {@link #getConsumerCount()} method will always return the current active consumer count on each event.
+ */
+ public int getConsumerCount() {
+ return consumerCount;
+ }
+
+ public ConsumerId getConsumerId() {
+ return consumerId;
+ }
+
+ public abstract boolean isStarted();
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.Service;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An object which can be used to listen to the number of active consumers
+ * available on a given destination.
+ *
+ * @version $Revision: 669263 $
+ */
+public class ConsumerEventSource implements Service, MessageListener {
+ private static final Log LOG = LogFactory.getLog(ConsumerEventSource.class);
+
+ private final Connection connection;
+ private final ActiveMQDestination destination;
+ private ConsumerListener listener;
+ private AtomicBoolean started = new AtomicBoolean(false);
+ private AtomicInteger consumerCount = new AtomicInteger();
+ private Session session;
+ private ActiveMQMessageConsumer consumer;
+
+ public ConsumerEventSource(Connection connection, Destination destination) throws JMSException {
+ this.connection = connection;
+ this.destination = ActiveMQDestination.transform(destination);
+ }
+
+ public void setConsumerListener(ConsumerListener listener) {
+ this.listener = listener;
+ }
+
+ public String getConsumerId() {
+ return consumer != null ? consumer.getConsumerId().toString() : "NOT_SET";
+ }
+
+ public void start() throws Exception {
+ if (started.compareAndSet(false, true)) {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(destination);
+ consumer = (ActiveMQMessageConsumer) session.createConsumer(advisoryTopic);
+ consumer.setMessageListener(this);
+ }
+ }
+
+ public void stop() throws Exception {
+ if (started.compareAndSet(true, false)) {
+ if (session != null) {
+ session.close();
+ }
+ }
+ }
+
+ public void onMessage(Message message) {
+ if (message instanceof ActiveMQMessage) {
+ ActiveMQMessage activeMessage = (ActiveMQMessage)message;
+ Object command = activeMessage.getDataStructure();
+ int count = 0;
+ if (command instanceof ConsumerInfo) {
+ count = consumerCount.incrementAndGet();
+ count = extractConsumerCountFromMessage(message, count);
+ fireConsumerEvent(new ConsumerStartedEvent(this, destination, (ConsumerInfo)command, count));
+ } else if (command instanceof RemoveInfo) {
+ RemoveInfo removeInfo = (RemoveInfo)command;
+ if (removeInfo.isConsumerRemove()) {
+ count = consumerCount.decrementAndGet();
+ count = extractConsumerCountFromMessage(message, count);
+ fireConsumerEvent(new ConsumerStoppedEvent(this, destination, (ConsumerId)removeInfo.getObjectId(), count));
+ }
+ } else {
+ LOG.warn("Unknown command: " + command);
+ }
+ } else {
+ LOG.warn("Unknown message type: " + message + ". Message ignored");
+ }
+ }
+
+ /**
+ * Lets rely by default on the broker telling us what the consumer count is
+ * as it can ensure that we are up to date at all times and have not
+ * received messages out of order etc.
+ */
+ protected int extractConsumerCountFromMessage(Message message, int count) {
+ try {
+ Object value = message.getObjectProperty("consumerCount");
+ if (value instanceof Number) {
+ Number n = (Number)value;
+ return n.intValue();
+ }
+ LOG.warn("No consumerCount header available on the message: " + message);
+ } catch (Exception e) {
+ LOG.warn("Failed to extract consumerCount from message: " + message + ".Reason: " + e, e);
+ }
+ return count;
+ }
+
+ protected void fireConsumerEvent(ConsumerEvent event) {
+ if (listener != null) {
+ listener.onConsumerEvent(event);
+ }
+ }
+
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerListener.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerListener.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+/**
+ * Listen to the changes in the number of active consumers available for a given destination.
+ *
+ * @version $Revision: 564271 $
+ */
+public interface ConsumerListener {
+
+ void onConsumerEvent(ConsumerEvent event);
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStartedEvent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStartedEvent.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStartedEvent.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStartedEvent.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
+
+/**
+ * An event when a new consumer has started.
+ *
+ * @version $Revision: 564271 $
+ */
+public class ConsumerStartedEvent extends ConsumerEvent {
+
+ private static final long serialVersionUID = 5088138839609391074L;
+
+ private final transient ConsumerInfo consumerInfo;
+
+ public ConsumerStartedEvent(ConsumerEventSource source, ActiveMQDestination destination, ConsumerInfo consumerInfo, int count) {
+ super(source, destination, consumerInfo.getConsumerId(), count);
+ this.consumerInfo = consumerInfo;
+ }
+
+ public boolean isStarted() {
+ return true;
+ }
+
+ /**
+ * @return details of the subscription
+ */
+ public ConsumerInfo getConsumerInfo() {
+ return consumerInfo;
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStoppedEvent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStoppedEvent.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStoppedEvent.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStoppedEvent.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+
+/**
+ * An event generated when a consumer stops.
+ *
+ * @version $Revision: 563921 $
+ */
+public class ConsumerStoppedEvent extends ConsumerEvent {
+
+ private static final long serialVersionUID = 5378835541037193206L;
+
+ public ConsumerStoppedEvent(ConsumerEventSource source, ActiveMQDestination destination, ConsumerId consumerId, int count) {
+ super(source, destination, consumerId, count);
+ }
+
+ public boolean isStarted() {
+ return false;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationEvent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationEvent.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationEvent.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationEvent.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.EventObject;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * An event caused when a destination is created or deleted
+ *
+ * @version $Revision: 634277 $
+ */
+public class DestinationEvent extends EventObject {
+ private static final long serialVersionUID = 2442156576867593780L;
+ private DestinationInfo destinationInfo;
+
+ public DestinationEvent(DestinationSource source, DestinationInfo destinationInfo) {
+ super(source);
+ this.destinationInfo = destinationInfo;
+ }
+
+ public ActiveMQDestination getDestination() {
+ return getDestinationInfo().getDestination();
+ }
+
+ public boolean isAddOperation() {
+ return getDestinationInfo().isAddOperation();
+ }
+
+ public long getTimeout() {
+ return getDestinationInfo().getTimeout();
+ }
+
+ public boolean isRemoveOperation() {
+ return getDestinationInfo().isRemoveOperation();
+ }
+
+ public DestinationInfo getDestinationInfo() {
+ return destinationInfo;
+ }
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationListener.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationListener.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+/**
+ * Listen to the changes in destinations being created or destroyed
+ *
+ * @version $Revision: 634277 $
+ */
+public interface DestinationListener {
+ void onDestinationEvent(DestinationEvent event);
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationSource.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationSource.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationSource.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A helper class which keeps track of the Destinations available in a broker and allows you to listen to them
+ * being created or deleted.
+ *
+ * @version $Revision: 681153 $
+ */
+public class DestinationSource implements MessageListener {
+ private static final Log LOG = LogFactory.getLog(ConsumerEventSource.class);
+ private AtomicBoolean started = new AtomicBoolean(false);
+ private final Connection connection;
+ private Session session;
+ private MessageConsumer queueConsumer;
+ private MessageConsumer topicConsumer;
+ private MessageConsumer tempTopicConsumer;
+ private MessageConsumer tempQueueConsumer;
+ private Set<ActiveMQQueue> queues = new CopyOnWriteArraySet<ActiveMQQueue>();
+ private Set<ActiveMQTopic> topics = new CopyOnWriteArraySet<ActiveMQTopic>();
+ private Set<ActiveMQTempQueue> temporaryQueues = new CopyOnWriteArraySet<ActiveMQTempQueue>();
+ private Set<ActiveMQTempTopic> temporaryTopics = new CopyOnWriteArraySet<ActiveMQTempTopic>();
+ private DestinationListener listener;
+
+ public DestinationSource(Connection connection) throws JMSException {
+ this.connection = connection;
+ }
+
+ public DestinationListener getListener() {
+ return listener;
+ }
+
+ public void setDestinationListener(DestinationListener listener) {
+ this.listener = listener;
+ }
+
+ /**
+ * Returns the current queues available on the broker
+ */
+ public Set<ActiveMQQueue> getQueues() {
+ return queues;
+ }
+
+ /**
+ * Returns the current topics on the broker
+ */
+ public Set<ActiveMQTopic> getTopics() {
+ return topics;
+ }
+
+ /**
+ * Returns the current temporary topics available on the broker
+ */
+ public Set<ActiveMQTempQueue> getTemporaryQueues() {
+ return temporaryQueues;
+ }
+
+ /**
+ * Returns the current temporary queues available on the broker
+ */
+ public Set<ActiveMQTempTopic> getTemporaryTopics() {
+ return temporaryTopics;
+ }
+
+ public void start() throws JMSException {
+ if (started.compareAndSet(false, true)) {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queueConsumer = session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC);
+ queueConsumer.setMessageListener(this);
+
+ topicConsumer = session.createConsumer(AdvisorySupport.TOPIC_ADVISORY_TOPIC);
+ topicConsumer.setMessageListener(this);
+
+ tempQueueConsumer = session.createConsumer(AdvisorySupport.TEMP_QUEUE_ADVISORY_TOPIC);
+ tempQueueConsumer.setMessageListener(this);
+
+ tempTopicConsumer = session.createConsumer(AdvisorySupport.TEMP_TOPIC_ADVISORY_TOPIC);
+ tempTopicConsumer.setMessageListener(this);
+ }
+ }
+
+ public void stop() throws JMSException {
+ if (started.compareAndSet(true, false)) {
+ if (session != null) {
+ session.close();
+ }
+ }
+ }
+
+ public void onMessage(Message message) {
+ if (message instanceof ActiveMQMessage) {
+ ActiveMQMessage activeMessage = (ActiveMQMessage) message;
+ Object command = activeMessage.getDataStructure();
+ if (command instanceof DestinationInfo) {
+ DestinationInfo destinationInfo = (DestinationInfo) command;
+ DestinationEvent event = new DestinationEvent(this, destinationInfo);
+ fireDestinationEvent(event);
+ }
+ else {
+ LOG.warn("Unknown dataStructure: " + command);
+ }
+ }
+ else {
+ LOG.warn("Unknown message type: " + message + ". Message ignored");
+ }
+ }
+
+ protected void fireDestinationEvent(DestinationEvent event) {
+ // now lets update the data structures
+ ActiveMQDestination destination = event.getDestination();
+ boolean add = event.isAddOperation();
+ if (destination instanceof ActiveMQQueue) {
+ ActiveMQQueue queue = (ActiveMQQueue) destination;
+ if (add) {
+ queues.add(queue);
+ }
+ else {
+ queues.remove(queue);
+ }
+ }
+ else if (destination instanceof ActiveMQTopic) {
+ ActiveMQTopic topic = (ActiveMQTopic) destination;
+ if (add) {
+ topics.add(topic);
+ }
+ else {
+ topics.remove(topic);
+ }
+ }
+ else if (destination instanceof ActiveMQTempQueue) {
+ ActiveMQTempQueue queue = (ActiveMQTempQueue) destination;
+ if (add) {
+ temporaryQueues.add(queue);
+ }
+ else {
+ temporaryQueues.remove(queue);
+ }
+ }
+ else if (destination instanceof ActiveMQTempTopic) {
+ ActiveMQTempTopic topic = (ActiveMQTempTopic) destination;
+ if (add) {
+ temporaryTopics.add(topic);
+ }
+ else {
+ temporaryTopics.remove(topic);
+ }
+ }
+ else {
+ LOG.warn("Unknown destination type: " + destination);
+ }
+ if (listener != null) {
+ listener.onDestinationEvent(event);
+ }
+ }
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEvent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEvent.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEvent.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEvent.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.EventObject;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.command.ProducerId;
+
+/**
+ * An event when the number of producers on a given destination changes.
+ *
+ * @version $Revision: 359679 $
+ */
+public abstract class ProducerEvent extends EventObject {
+ private static final long serialVersionUID = 2442156576867593780L;
+ private final Destination destination;
+ private final ProducerId producerId;
+ private final int producerCount;
+
+ public ProducerEvent(ProducerEventSource source, Destination destination, ProducerId producerId, int producerCount) {
+ super(source);
+ this.destination = destination;
+ this.producerId = producerId;
+ this.producerCount = producerCount;
+ }
+
+ public ProducerEventSource getAdvisor() {
+ return (ProducerEventSource) getSource();
+ }
+
+ public Destination getDestination() {
+ return destination;
+ }
+
+ /**
+ * Returns the current number of producers active at the time this advisory was sent.
+ *
+ */
+ public int getProducerCount() {
+ return producerCount;
+ }
+
+ public ProducerId getProducerId() {
+ return producerId;
+ }
+
+ public abstract boolean isStarted();
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An object which can be used to listen to the number of active consumers
+ * available on a given destination.
+ *
+ * @version $Revision: 359679 $
+ */
+public class ProducerEventSource implements Service, MessageListener {
+ private static final Log LOG = LogFactory.getLog(ProducerEventSource.class);
+
+ private final Connection connection;
+ private final ActiveMQDestination destination;
+ private ProducerListener listener;
+ private AtomicBoolean started = new AtomicBoolean(false);
+ private AtomicInteger producerCount = new AtomicInteger();
+ private Session session;
+ private MessageConsumer consumer;
+
+ public ProducerEventSource(Connection connection, Destination destination) throws JMSException {
+ this.connection = connection;
+ this.destination = ActiveMQDestination.transform(destination);
+ }
+
+ public void setProducerListener(ProducerListener listener) {
+ this.listener = listener;
+ }
+
+ public void start() throws Exception {
+ if (started.compareAndSet(false, true)) {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQTopic advisoryTopic = AdvisorySupport.getProducerAdvisoryTopic(destination);
+ consumer = session.createConsumer(advisoryTopic);
+ consumer.setMessageListener(this);
+ }
+ }
+
+ public void stop() throws Exception {
+ if (started.compareAndSet(true, false)) {
+ if (session != null) {
+ session.close();
+ }
+ }
+ }
+
+ public void onMessage(Message message) {
+ if (message instanceof ActiveMQMessage) {
+ ActiveMQMessage activeMessage = (ActiveMQMessage)message;
+ Object command = activeMessage.getDataStructure();
+ int count = 0;
+ if (command instanceof ProducerInfo) {
+ count = producerCount.incrementAndGet();
+ count = extractProducerCountFromMessage(message, count);
+ fireProducerEvent(new ProducerStartedEvent(this, destination, (ProducerInfo)command, count));
+ } else if (command instanceof RemoveInfo) {
+ RemoveInfo removeInfo = (RemoveInfo)command;
+ if (removeInfo.isProducerRemove()) {
+ count = producerCount.decrementAndGet();
+ count = extractProducerCountFromMessage(message, count);
+ fireProducerEvent(new ProducerStoppedEvent(this, destination, (ProducerId)removeInfo.getObjectId(), count));
+ }
+ } else {
+ LOG.warn("Unknown command: " + command);
+ }
+ } else {
+ LOG.warn("Unknown message type: " + message + ". Message ignored");
+ }
+ }
+
+ protected int extractProducerCountFromMessage(Message message, int count) {
+ try {
+ Object value = message.getObjectProperty("producerCount");
+ if (value instanceof Number) {
+ Number n = (Number)value;
+ return n.intValue();
+ }
+ LOG.warn("No producerCount header available on the message: " + message);
+ } catch (Exception e) {
+ LOG.warn("Failed to extract producerCount from message: " + message + ".Reason: " + e, e);
+ }
+ return count;
+ }
+
+ protected void fireProducerEvent(ProducerEvent event) {
+ if (listener != null) {
+ listener.onProducerEvent(event);
+ }
+ }
+
+}