You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/12 19:59:13 UTC
svn commit: r752973 [1/2] - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/openwire/
test/java/org/apache/activemq/broker/ test/java/org/apache/activemq/...
Author: chirino
Date: Thu Mar 12 18:59:12 2009
New Revision: 752973
URL: http://svn.apache.org/viewvc?rev=752973&view=rev
Log:
Refactoring the MockBroker so that it's is more agnostic of the wire protocol used.
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,219 @@
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.IFlowRelay;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.Message;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.transport.DispatchableTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+
+abstract public class Connection implements TransportListener, DeliveryTarget {
+
+ protected Transport transport;
+
+ protected String name;
+
+ private int priorityLevels;
+
+ protected final int outputWindowSize = 1000;
+ protected final int outputResumeThreshold = 900;
+
+ protected final int inputWindowSize = 1000;
+ protected final int inputResumeThreshold = 500;
+
+ protected IFlowRelay<MessageDelivery> outputQueue;
+
+ private IDispatcher dispatcher;
+ private final AtomicBoolean stopping = new AtomicBoolean();
+ protected Flow outputFlow;
+ protected boolean blockingTransport = false;
+ protected ExecutorService blockingWriter;
+
+ public void setTransport(Transport transport) {
+ this.transport = transport;
+ }
+
+ public void start() throws Exception {
+ transport.setTransportListener(this);
+ transport.start();
+ }
+
+ public void stop() throws Exception {
+ stopping.set(true);
+ if (transport != null) {
+ transport.stop();
+ }
+ if (blockingWriter != null) {
+ blockingWriter.shutdown();
+ }
+ }
+
+ protected void initialize() {
+ }
+
+ protected final void write(final Object o) {
+ synchronized (outputQueue) {
+ if (!blockingTransport) {
+ try {
+ transport.oneway(o);
+ } catch (IOException e) {
+ onException(e);
+ }
+ } else {
+ try {
+ blockingWriter.execute(new Runnable() {
+ public void run() {
+ if (!stopping.get()) {
+ try {
+ transport.oneway(o);
+ } catch (IOException e) {
+ onException(e);
+ }
+ }
+ }
+ });
+ } catch (RejectedExecutionException re) {
+ //Must be shutting down.
+ }
+ }
+ }
+ }
+
+ public void onException(IOException error) {
+ onException((Exception) error);
+ }
+
+ public void onException(Exception error) {
+ if (!isStopping()) {
+ System.out.println("RemoteConnection error: " + error);
+ error.printStackTrace();
+ }
+ }
+
+ public boolean isStopping(){
+ return stopping.get();
+ }
+
+ public void transportInterupted() {
+ }
+
+ public void transportResumed() {
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getPriorityLevels() {
+ return priorityLevels;
+ }
+
+ public void setPriorityLevels(int priorityLevels) {
+ this.priorityLevels = priorityLevels;
+ }
+
+ public IDispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setDispatcher(IDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ if (transport instanceof DispatchableTransport) {
+ DispatchableTransport dt = ((DispatchableTransport) transport);
+ if (name != null) {
+ dt.setName(name);
+ }
+ dt.setDispatcher(getDispatcher());
+ }
+ }
+
+ public int getOutputWindowSize() {
+ return outputWindowSize;
+ }
+
+ public int getOutputResumeThreshold() {
+ return outputResumeThreshold;
+ }
+
+ public int getInputWindowSize() {
+ return inputWindowSize;
+ }
+
+ public int getInputResumeThreshold() {
+ return inputResumeThreshold;
+ }
+
+ public IFlowRelay<MessageDelivery> getSink() {
+ return outputQueue;
+ }
+
+ public boolean match(MessageDelivery message) {
+ return true;
+ }
+
+ protected interface ProtocolLimiter<E> extends IFlowLimiter<E> {
+ public void onProtocolCredit(int credit);
+ }
+
+ protected class WindowLimiter<E> extends SizeLimiter<E> implements ProtocolLimiter<E> {
+ final Flow flow;
+ final boolean clientMode;
+ private int available;
+
+ public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) {
+ super(capacity, resumeThreshold);
+ this.clientMode = clientMode;
+ this.flow = flow;
+ }
+
+ public void reserve(E elem) {
+ super.reserve(elem);
+// if (!clientMode) {
+// System.out.println(name + " Reserved " + this);
+// }
+ }
+
+ public void releaseReserved(E elem) {
+ super.reserve(elem);
+// if (!clientMode) {
+// System.out.println(name + " Released Reserved " + this);
+// }
+ }
+
+ protected void remove(int size) {
+ super.remove(size);
+ if (!clientMode) {
+ available += size;
+ if (available >= capacity - resumeThreshold) {
+ sendCredit(available);
+ available = 0;
+ }
+ }
+ }
+
+ protected void sendCredit(int credit) {
+ throw new UnsupportedOperationException("Please override this method to provide and implemenation.");
+ }
+
+ public void onProtocolCredit(int credit) {
+ remove(credit);
+ }
+
+ public int getElementSize(MessageDelivery m) {
+ return m.getFlowLimiterSize();
+ }
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,124 @@
+package org.apache.activemq.broker;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.Connection;
+import org.apache.activemq.broker.openwire.OpenwireBrokerConnection;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.transport.DispatchableTransportServer;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportServer;
+
+public class Broker implements TransportAcceptListener {
+
+ public static final int MAX_USER_PRIORITY = 10;
+ public static final int MAX_PRIORITY = MAX_USER_PRIORITY + 1;
+
+ final Router router = new Router();
+
+ final ArrayList<Connection> clientConnections = new ArrayList<Connection>();
+ final HashMap<Destination, Queue> queues = new HashMap<Destination, Queue>();
+
+ private TransportServer transportServer;
+ private String uri;
+ private String name;
+ private IDispatcher dispatcher;
+ private final AtomicBoolean stopping = new AtomicBoolean();
+
+ public String getName() {
+ return name;
+ }
+
+
+ public void addQueue(Queue queue) {
+ Domain domain = router.getDomain(queue.getDestination().getDomain());
+ domain.add(queue.getDestination().getName(), queue);
+ }
+
+ public final void stop() throws Exception {
+ stopping.set(true);
+ transportServer.stop();
+
+ for (Connection connection : clientConnections) {
+ connection.stop();
+ }
+ for (Queue queue : queues.values()) {
+ queue.stop();
+ }
+ dispatcher.shutdown();
+
+ }
+
+ public final void start() throws Exception {
+
+ dispatcher.start();
+
+ transportServer = TransportFactory.bind(new URI(uri));
+ transportServer.setAcceptListener(this);
+ if (transportServer instanceof DispatchableTransportServer) {
+ ((DispatchableTransportServer) transportServer).setDispatcher(dispatcher);
+ }
+ transportServer.start();
+
+ for (Queue queue : queues.values()) {
+ queue.start();
+ }
+ }
+
+ public void onAccept(final Transport transport) {
+ OpenwireBrokerConnection connection = new OpenwireBrokerConnection();
+ connection.setBroker(this);
+ connection.setTransport(transport);
+ connection.setPriorityLevels(MAX_PRIORITY);
+ connection.setDispatcher(dispatcher);
+ clientConnections.add(connection);
+ try {
+ connection.start();
+ } catch (Exception e1) {
+ onAcceptError(e1);
+ }
+ }
+
+ public void onAcceptError(Exception error) {
+ System.out.println("Accept error: " + error);
+ error.printStackTrace();
+ }
+
+ public IDispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setDispatcher(IDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public URI getConnectURI() {
+ return transportServer.getConnectURI();
+ }
+
+ public boolean isStopping() {
+ return stopping.get();
+ }
+
+ public Router getRouter() {
+ return router;
+ }
+
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,23 @@
+package org.apache.activemq.broker;
+
+import org.apache.activemq.Connection;
+
+abstract public class BrokerConnection extends Connection {
+
+ protected Broker broker;
+
+ public Broker getBroker() {
+ return broker;
+ }
+
+ public void setBroker(Broker broker) {
+ this.broker = broker;
+ }
+
+ @Override
+ public boolean isStopping() {
+ return super.isStopping() || broker.isStopping();
+ }
+
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,14 @@
+/**
+ *
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.flow.IFlowSink;
+
+public interface DeliveryTarget {
+
+ public IFlowSink<MessageDelivery> getSink();
+
+ public boolean match(MessageDelivery message);
+
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,84 @@
+package org.apache.activemq.broker;
+
+import java.util.Collection;
+
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public interface Destination {
+
+ AsciiBuffer getDomain();
+ AsciiBuffer getName();
+ Collection<Destination> getDestinations();
+
+ public class SingleDestination implements Destination {
+
+ private AsciiBuffer domain;
+ private AsciiBuffer name;
+
+ public SingleDestination() {
+ }
+ public SingleDestination(AsciiBuffer domain, AsciiBuffer name) {
+ setDomain(domain);
+ setName(name);
+ }
+ public SingleDestination(String domain, String name) {
+ setDomain(domain);
+ setName(name);
+ }
+
+ public Collection<Destination> getDestinations() {
+ return null;
+ }
+
+ public AsciiBuffer getDomain() {
+ return domain;
+ }
+
+ public AsciiBuffer getName() {
+ return name;
+ }
+ public void setDomain(AsciiBuffer domain) {
+ this.domain = domain;
+ }
+ public void setName(AsciiBuffer name) {
+ this.name = name;
+ }
+
+ private void setName(String name) {
+ setName(new AsciiBuffer(name));
+ }
+ private void setDomain(String domain) {
+ setDomain(new AsciiBuffer(domain));
+ }
+ }
+
+ public class MultiDestination implements Destination {
+
+ private Collection<Destination> destinations;
+
+ public MultiDestination() {
+ }
+
+ public MultiDestination(Collection<Destination> destinations) {
+ this.destinations=destinations;
+ }
+
+ public Collection<Destination> getDestinations() {
+ return destinations;
+ }
+
+ public void setDestinations(Collection<Destination> destinations) {
+ this.destinations = destinations;
+ }
+
+ public AsciiBuffer getDomain() {
+ return null;
+ }
+
+ public AsciiBuffer getName() {
+ return null;
+ }
+
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,23 @@
+package org.apache.activemq.broker;
+
+import java.util.Collection;
+
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+/**
+ * Represents a messaging domain like pub/sub or point to point in JMS terms or an Exchange in
+ * AMQP terms.
+ *
+ * @author chirino
+ */
+public interface Domain {
+
+ public void add(AsciiBuffer name, Object value);
+
+ public Object remove(AsciiBuffer name);
+
+ public void bind(AsciiBuffer name, DeliveryTarget dt);
+
+ public Collection<DeliveryTarget> route(MessageDelivery msg);
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,22 @@
+package org.apache.activemq.broker;
+
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public interface MessageDelivery {
+
+ public Destination getDestination();
+
+ public int getPriority();
+
+ public int getFlowLimiterSize();
+
+ public AsciiBuffer getMsgId();
+
+ public AsciiBuffer getProducerId();
+
+ public void setCompletionCallback(Runnable runnable);
+ public Runnable getCompletionCallback();
+
+ public <T> T asType(Class<T> type);
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,159 @@
+/**
+ *
+ */
+package org.apache.activemq.broker;
+
+import java.util.HashMap;
+
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.PrioritySizeLimiter;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.Mapper;
+import org.apache.activemq.queue.PartitionedQueue;
+import org.apache.activemq.queue.SharedPriorityQueue;
+import org.apache.activemq.queue.SharedQueue;
+import org.apache.activemq.queue.Subscription;
+
+public class Queue implements DeliveryTarget {
+
+ HashMap<DeliveryTarget, Subscription<MessageDelivery>> subs = new HashMap<DeliveryTarget, Subscription<MessageDelivery>>();
+ private Destination destination;
+ private IQueue<AsciiBuffer, MessageDelivery> queue;
+ private Broker broker;
+
+ private Mapper<Integer, MessageDelivery> partitionMapper;
+ private Mapper<AsciiBuffer, MessageDelivery> keyExtractor;
+
+ private IQueue<AsciiBuffer, MessageDelivery> createQueue() {
+
+ if (partitionMapper!=null) {
+ PartitionedQueue<Integer, AsciiBuffer, MessageDelivery> queue = new PartitionedQueue<Integer, AsciiBuffer, MessageDelivery>() {
+ @Override
+ protected IQueue<AsciiBuffer, MessageDelivery> cratePartition(Integer partitionKey) {
+ return createSharedFlowQueue();
+ }
+ };
+ queue.setPartitionMapper(partitionMapper);
+ queue.setResourceName(destination.getName().toString());
+ return queue;
+ } else {
+ return createSharedFlowQueue();
+ }
+ }
+
+
+ public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
+ public Integer map(MessageDelivery element) {
+ return element.getPriority();
+ }
+ };
+
+ private IQueue<AsciiBuffer, MessageDelivery> createSharedFlowQueue() {
+ if (Broker.MAX_PRIORITY > 1) {
+ PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(100, 1, Broker.MAX_PRIORITY);
+ limiter.setPriorityMapper(PRIORITY_MAPPER);
+ SharedPriorityQueue<AsciiBuffer, MessageDelivery> queue = new SharedPriorityQueue<AsciiBuffer, MessageDelivery>(destination.getName().toString(), limiter);
+ queue.setKeyMapper(keyExtractor);
+ queue.setAutoRelease(true);
+ queue.setDispatcher(broker.getDispatcher());
+ return queue;
+ } else {
+ SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 1);
+ SharedQueue<AsciiBuffer, MessageDelivery> queue = new SharedQueue<AsciiBuffer, MessageDelivery>(destination.getName().toString(), limiter);
+ queue.setKeyMapper(keyExtractor);
+ queue.setAutoRelease(true);
+ queue.setDispatcher(broker.getDispatcher());
+ return queue;
+ }
+ }
+
+ public final void deliver(ISourceController<MessageDelivery> source, MessageDelivery msg) {
+ queue.add(msg, source);
+ }
+
+ public final Destination getDestination() {
+ return destination;
+ }
+
+ public final void addConsumer(final DeliveryTarget dt) {
+ Subscription<MessageDelivery> sub = new Subscription<MessageDelivery>() {
+ public boolean isPreAcquired() {
+ return true;
+ }
+
+ public boolean matches(MessageDelivery message) {
+ return dt.match(message);
+ }
+
+ public boolean isRemoveOnDispatch() {
+ return true;
+ }
+
+ public IFlowSink<MessageDelivery> getSink() {
+ return dt.getSink();
+ }
+
+ @Override
+ public String toString() {
+ return getSink().toString();
+ }
+ };
+ subs.put(dt, sub);
+ queue.addSubscription(sub);
+ }
+
+ public boolean removeSubscirption(final DeliveryTarget dt) {
+ Subscription<MessageDelivery> sub = subs.remove(dt);
+ if (sub != null) {
+ return queue.removeSubscription(sub);
+ }
+ return false;
+ }
+
+ public void start() throws Exception {
+ queue = createQueue();
+ }
+
+ public void stop() throws Exception {
+ }
+
+ public IFlowSink<MessageDelivery> getSink() {
+ return queue;
+ }
+
+ public boolean match(MessageDelivery message) {
+ return true;
+ }
+
+ public Broker getBroker() {
+ return broker;
+ }
+
+ public void setBroker(Broker broker) {
+ this.broker = broker;
+ }
+
+ public Mapper<Integer, MessageDelivery> getPartitionMapper() {
+ return partitionMapper;
+ }
+
+ public void setPartitionMapper(Mapper<Integer, MessageDelivery> partitionMapper) {
+ this.partitionMapper = partitionMapper;
+ }
+
+ public Mapper<AsciiBuffer, MessageDelivery> getKeyExtractor() {
+ return keyExtractor;
+ }
+
+ public void setKeyExtractor(Mapper<AsciiBuffer, MessageDelivery> keyExtractor) {
+ this.keyExtractor = keyExtractor;
+ }
+
+ public void setDestination(Destination destination) {
+ this.destination = destination;
+ }
+
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,34 @@
+package org.apache.activemq.broker;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public class QueueDomain implements Domain {
+
+ final HashMap<AsciiBuffer, Queue> queues = new HashMap<AsciiBuffer, Queue>();
+
+ public void add(AsciiBuffer name, Object queue) {
+ queues.put(name, (Queue)queue);
+ }
+ public Object remove(AsciiBuffer name) {
+ return queues.remove(name);
+ }
+
+ public void bind(AsciiBuffer name, DeliveryTarget deliveryTarget) {
+ queues.get(name).addConsumer(deliveryTarget);
+ }
+
+ public Collection<DeliveryTarget> route(MessageDelivery name) {
+ Queue queue = queues.get(name);
+ if( queue!=null ) {
+ ArrayList<DeliveryTarget> rc = new ArrayList<DeliveryTarget>(1);
+ rc.add(queue);
+ return rc;
+ }
+ return null;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,46 @@
+/**
+ *
+ */
+package org.apache.activemq.broker;
+
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+final public class Router {
+
+ public static final AsciiBuffer TOPIC_DOMAIN = new AsciiBuffer("topic");
+ public static final AsciiBuffer QUEUE_DOMAIN = new AsciiBuffer("queue");
+
+ private final HashMap<AsciiBuffer, Domain> domains = new HashMap<AsciiBuffer, Domain>();
+
+ public Router() {
+ domains.put(QUEUE_DOMAIN, new QueueDomain());
+ domains.put(TOPIC_DOMAIN, new TopicDomain());
+ }
+
+ public Domain getDomain(AsciiBuffer name) {
+ return domains.get(name);
+ }
+
+ public Domain putDomain(AsciiBuffer name, Domain domain) {
+ return domains.put(name, domain);
+ }
+
+ public Domain removeDomain(Object name) {
+ return domains.remove(name);
+ }
+
+
+ public synchronized void bind(Destination destination, DeliveryTarget dt) {
+ Domain domain = domains.get(destination.getDomain());
+ domain.bind(destination.getName(), dt);
+ }
+
+ public Collection<DeliveryTarget> route(MessageDelivery msg) {
+ Domain domain = domains.get(msg.getDestination().getDomain());
+ return domain.route(msg);
+ }
+
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,32 @@
+package org.apache.activemq.broker;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public class TopicDomain implements Domain {
+
+ final HashMap<AsciiBuffer, ArrayList<DeliveryTarget>> topicsTargets = new HashMap<AsciiBuffer, ArrayList<DeliveryTarget>>();
+
+ public void add(AsciiBuffer name, Object queue) {
+ }
+ public Object remove(AsciiBuffer name) {
+ return null;
+ }
+
+ public void bind(AsciiBuffer name, DeliveryTarget target) {
+ ArrayList<DeliveryTarget> targets = topicsTargets.get(name);
+ if (targets == null) {
+ targets = new ArrayList<DeliveryTarget>();
+ topicsTargets.put(name, targets);
+ }
+ targets.add(target);
+ }
+
+ public Collection<DeliveryTarget> route(MessageDelivery name) {
+ return topicsTargets.get(name);
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,68 @@
+package org.apache.activemq.broker.openwire;
+
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public class OpenWireMessageDelivery implements MessageDelivery {
+
+ private final Message message;
+ private Destination destination;
+ private AsciiBuffer producerId;
+ private Runnable completionCallback;
+
+ public OpenWireMessageDelivery(Message message) {
+ this.message = message;
+ }
+
+ public Destination getDestination() {
+ if( destination == null ) {
+ destination = OpenwireBrokerConnection.convert(message.getDestination());
+ }
+ return destination;
+ }
+
+ public int getFlowLimiterSize() {
+ return message.getSize();
+ }
+
+ public int getPriority() {
+ return message.getPriority();
+ }
+
+ public AsciiBuffer getMsgId() {
+ return null;
+ }
+
+ public AsciiBuffer getProducerId() {
+ if( producerId == null ) {
+ producerId = new AsciiBuffer(message.getProducerId().toString());
+ }
+ return producerId;
+ }
+
+ public Message getMessage() {
+ return message;
+ }
+
+ public Runnable getCompletionCallback() {
+ return completionCallback;
+ }
+
+ public void setCompletionCallback(Runnable completionCallback) {
+ this.completionCallback = completionCallback;
+ }
+
+ public <T> T asType(Class<T> type) {
+ if( type == Message.class ) {
+ return type.cast(message);
+ }
+ // TODO: is this right?
+ if( message.getClass().isAssignableFrom(type) ) {
+ return type.cast(message);
+ }
+ return null;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,472 @@
+package org.apache.activemq.broker.openwire;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.BrokerConnection;
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ControlCommand;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.FlushCommand;
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.LogicExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NoLocalExpression;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowController;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.state.CommandVisitor;
+
+public class OpenwireBrokerConnection extends BrokerConnection {
+
+ protected final HashMap<ProducerId, ProducerContext> producers = new HashMap<ProducerId, ProducerContext>();
+ protected final HashMap<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId, ConsumerContext>();
+
+ protected final Object inboundMutex = new Object();
+ protected IFlowController<MessageDelivery> inboundController;
+
+ protected IFlowController<MessageDelivery> outboundController;
+// public ProtocolLimiter<MessageDelivery> outboundLimiter;
+ protected Flow ouboundFlow;
+
+ public void onCommand(Object o) {
+ final Command command = (Command) o;
+ boolean responseRequired = command.isResponseRequired();
+ try {
+ command.visit(new CommandVisitor() {
+
+ ///////////////////////////////////////////////////////////////////
+ // Methods that keep track of the client state
+ ///////////////////////////////////////////////////////////////////
+ public Response processAddConnection(ConnectionInfo info) throws Exception {
+ return ack(command);
+ }
+ public Response processAddSession(SessionInfo info) throws Exception {
+ return ack(command);
+ }
+ public Response processAddProducer(ProducerInfo info) throws Exception {
+ producers.put(info.getProducerId(), new ProducerContext(info));
+ return ack(command);
+ }
+ public Response processAddConsumer(ConsumerInfo info) throws Exception {
+ ConsumerContext ctx = new ConsumerContext(info);
+ consumers.put(info.getConsumerId(), ctx);
+
+ broker.getRouter().bind(convert(info.getDestination()), ctx);
+
+
+ return ack(command);
+ }
+ public Response processRemoveConnection(ConnectionId info, long arg1) throws Exception {
+ return ack(command);
+ }
+ public Response processRemoveSession(SessionId info, long arg1) throws Exception {
+ return ack(command);
+ }
+ public Response processRemoveProducer(ProducerId info) throws Exception {
+ producers.remove(info);
+ return ack(command);
+ }
+ public Response processRemoveConsumer(ConsumerId info, long arg1) throws Exception {
+ return ack(command);
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // Message Processing Methods.
+ ///////////////////////////////////////////////////////////////////
+ public Response processMessage(Message info) throws Exception {
+ ProducerId producerId = info.getProducerId();
+ ProducerContext producerContext = producers.get(producerId);
+
+ OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
+
+ // Only producers that are not using a window will block, and if it blocks.
+ // yes we block the connection's read thread. yes other sessions will not get
+ // serviced while we block here. The producer is depending on TCP flow
+ // control to slow him down so we have to stop ready from the socket at this
+ // point.
+ while( !producerContext.controller.offer(md, null) ) {
+ producerContext.controller.waitForFlowUnblock();
+ }
+ return null;
+ }
+ public Response processMessageAck(MessageAck info) throws Exception {
+ return ack(command);
+ }
+ public Response processMessagePull(MessagePull info) throws Exception {
+ return ack(command);
+ }
+ public Response processProducerAck(ProducerAck info) throws Exception {
+ return ack(command);
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // Control Methods
+ ///////////////////////////////////////////////////////////////////
+ public Response processWireFormat(WireFormatInfo info) throws Exception {
+ return ack(command);
+ }
+ public Response processShutdown(ShutdownInfo info) throws Exception {
+ return ack(command);
+ }
+ public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+ return ack(command);
+ }
+ public Response processFlush(FlushCommand info) throws Exception {
+ return ack(command);
+ }
+ public Response processConnectionControl(ConnectionControl info) throws Exception {
+ return ack(command);
+ }
+ public Response processConnectionError(ConnectionError info) throws Exception {
+ return ack(command);
+ }
+ public Response processConsumerControl(ConsumerControl info) throws Exception {
+ return ack(command);
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // Methods for server management
+ ///////////////////////////////////////////////////////////////////
+ public Response processAddDestination(DestinationInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ public Response processRemoveDestination(DestinationInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ public Response processControlCommand(ControlCommand info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // Methods for transaction management
+ ///////////////////////////////////////////////////////////////////
+ public Response processBeginTransaction(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ public Response processEndTransaction(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ public Response processForgetTransaction(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // Methods for cluster operations
+ ///////////////////////////////////////////////////////////////////
+ public Response processBrokerInfo(BrokerInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ public Response processMessageDispatch(MessageDispatch info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ });
+ } catch (Exception e) {
+ if (responseRequired) {
+ ExceptionResponse response = new ExceptionResponse(e);
+ response.setCorrelationId(command.getCommandId());
+ write(response);
+ } else {
+ onException(e);
+ }
+
+ }
+ }
+
+
+ ///////////////////////////////////////////////////////////////////
+ // Internal Support Methods
+ ///////////////////////////////////////////////////////////////////
+
+ private Response ack(Command command) {
+ Response rc = null;
+ if( command.isResponseRequired() ) {
+ rc = new Response();
+ rc.setCorrelationId(command.getCommandId());
+ }
+ return rc;
+ }
+
+ @Override
+ public void start() throws Exception {
+ super.start();
+ BrokerInfo info = new BrokerInfo();
+ info.setBrokerId(new BrokerId(broker.getName()));
+ info.setBrokerName(broker.getName());
+ info.setBrokerURL(broker.getConnectURI().toString());
+ write(info);
+ }
+
+ static class FlowControllableAdapter implements FlowControllable<MessageDelivery> {
+ public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+ }
+
+ public IFlowSink<MessageDelivery> getFlowSink() {
+ return null;
+ }
+
+ public IFlowSource<MessageDelivery> getFlowSource() {
+ return null;
+ }
+ }
+
+
+ class ConsumerContext implements DeliveryTarget {
+
+ private final ConsumerInfo info;
+ private String name;
+ private BooleanExpression selector;
+
+ public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException {
+ this.info = info;
+ this.name = info.getConsumerId().toString();
+ selector = parseSelector(info);
+ }
+
+ public IFlowSink<MessageDelivery> getSink() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public boolean match(MessageDelivery message) {
+ Message msg = message.asType(Message.class);
+ if( msg ==null ) {
+ return false;
+ }
+
+ MessageEvaluationContext selectorContext = new MessageEvaluationContext();
+ selectorContext.setMessageReference(msg);
+ selectorContext.setDestination(info.getDestination());
+ try {
+ return (selector == null || selector.matches(selectorContext));
+ } catch (JMSException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ }
+
+ private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
+ BooleanExpression rc = null;
+ if (info.getSelector() != null) {
+ rc = SelectorParser.parse(info.getSelector());
+ }
+ if (info.isNoLocal()) {
+ if (rc == null) {
+ rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
+ } else {
+ rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
+ }
+ }
+ if (info.getAdditionalPredicate() != null) {
+ if (rc == null) {
+ rc = info.getAdditionalPredicate();
+ } else {
+ rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
+ }
+ }
+ return rc;
+ }
+
+ class ProducerContext {
+
+ private final ProducerInfo info;
+ private IFlowController<MessageDelivery> controller;
+ private String name;
+
+ public ProducerContext(final ProducerInfo info) {
+ this.info = info;
+ this.name = info.getProducerId().toString();
+
+ // Openwire only uses credit windows at the producer level for producers that request the feature.
+ if( info.getWindowSize() > 0 ) {
+ Flow flow = new Flow(info.getProducerId().toString(), false);
+
+ WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize()/2) {
+ @Override
+ protected void sendCredit(int credit) {
+ ProducerAck ack = new ProducerAck(info.getProducerId(), credit);
+ write(ack);
+ }
+ };
+
+ controller = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
+ public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+ route(controller, elem);
+ }
+ public String toString() {
+ return name;
+ }
+ }, flow, limiter, inboundMutex);
+ } else {
+ controller = inboundController;
+ }
+ }
+ }
+
+ protected void route(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+ // TODO:
+ // Consider doing some caching of this target list. Most producers always send to
+ // the same destination.
+ Collection<DeliveryTarget> targets = broker.getRouter().route(elem);
+
+ final Message message = ((OpenWireMessageDelivery)elem).getMessage();
+ if( targets != null ) {
+
+ if( message.isResponseRequired() ) {
+ // We need to ack the message once we ensure we won't loose it.
+ // We know we won't loose it once it's persisted or delivered to a consumer
+ // Setup a callback to get notifed once one of those happens.
+ if( message.isPersistent() ) {
+ elem.setCompletionCallback(new Runnable(){
+ public void run() {
+ ack(message);
+ }
+ });
+ } else {
+ // Let the client know the broker got the message.
+ ack(message);
+ }
+ }
+
+ // Deliver the message to all the targets..
+ for (DeliveryTarget dt : targets) {
+ if (dt.match(elem)) {
+ dt.getSink().add(elem, controller);
+ }
+ }
+
+ } else {
+ // Let the client know we got the message even though there
+ // were no valid targets to deliver the message to.
+ if( message.isResponseRequired() ) {
+ ack(message);
+ }
+ }
+ controller.elementDispatched(elem);
+ }
+
+ protected void initialize() {
+
+ // Setup the input processing..
+ Flow flow = new Flow(name, false);
+ SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(inputWindowSize, inputResumeThreshold);
+ inboundController = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
+ public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+ route(controller, elem);
+ }
+ public String toString() {
+ return name;
+ }
+ }, flow, limiter, inboundMutex);
+
+// ouboundFlow = new Flow(name, false);
+// outboundLimiter = new WindowLimiter<MessageDelivery>(true, ouboundFlow, outputWindowSize, outputResumeThreshold);
+// outputQueue = new SingleFlowRelay<MessageDelivery>(ouboundFlow, name + "-outbound", outboundLimiter);
+// outboundController = outputQueue.getFlowController(ouboundFlow);
+//
+// if (transport instanceof DispatchableTransport) {
+// outputQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+//
+// public void drain(MessageDelivery message, ISourceController<MessageDelivery> controller) {
+// write(message);
+// }
+// });
+//
+// } else {
+// blockingTransport = true;
+// blockingWriter = Executors.newSingleThreadExecutor();
+// outputQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+// public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
+// write(message);
+// };
+// });
+//
+// }
+ }
+
+ static public Destination convert(ActiveMQDestination dest) {
+ if( dest.isComposite() ) {
+ ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations();
+ ArrayList<Destination> d= new ArrayList<Destination>();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ d.add(convert(compositeDestinations[i]));
+ }
+ return new Destination.MultiDestination(d);
+ }
+ AsciiBuffer domain;
+ if( dest.isQueue() ) {
+ domain = Router.QUEUE_DOMAIN;
+ } if( dest.isTopic() ) {
+ domain = Router.TOPIC_DOMAIN;
+ } else {
+ throw new IllegalArgumentException("Unsupported domain type: "+ dest);
+ }
+ return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.openwire;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Queue;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.PriorityDispatcher;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.Period;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.Mapper;
+
+public class MockBrokerTest extends TestCase {
+
+ protected static final int PERFORMANCE_SAMPLES = 3;
+
+ protected static final int IO_WORK_AMOUNT = 0;
+ protected static final int FANIN_COUNT = 10;
+ protected static final int FANOUT_COUNT = 10;
+
+ protected static final int PRIORITY_LEVELS = 10;
+ protected static final boolean USE_INPUT_QUEUES = true;
+
+ // Set to put senders and consumers on separate brokers.
+ protected boolean multibroker = false;
+
+ // Set to mockup up ptp:
+ protected boolean ptp = false;
+
+ // Set to use tcp IO
+ protected boolean tcp = true;
+ // set to force marshalling even in the NON tcp case.
+ protected boolean forceMarshalling = false;
+
+ protected String sendBrokerURI;
+ protected String receiveBrokerURI;
+
+ // Set's the number of threads to use:
+ protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors();
+ protected boolean usePartitionedQueue = false;
+
+ protected int producerCount;
+ protected int consumerCount;
+ protected int destCount;
+
+ protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
+ protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
+
+ protected Broker sendBroker;
+ protected Broker rcvBroker;
+ protected ArrayList<Broker> brokers = new ArrayList<Broker>();
+ protected IDispatcher dispatcher;
+ protected final AtomicLong msgIdGenerator = new AtomicLong();
+
+ final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
+ final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
+
+ static public final Mapper<AsciiBuffer, MessageDelivery> KEY_MAPPER = new Mapper<AsciiBuffer, MessageDelivery>() {
+ public AsciiBuffer map(MessageDelivery element) {
+ return element.getMsgId();
+ }
+ };
+ static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
+ public Integer map(MessageDelivery element) {
+ // we modulo 10 to have at most 10 partitions which the producers
+ // gets split across.
+ return (int) (element.getProducerId().hashCode() % 10);
+ }
+ };
+
+ @Override
+ protected void setUp() throws Exception {
+ dispatcher = createDispatcher();
+ dispatcher.start();
+ if (tcp) {
+ sendBrokerURI = "tcp://localhost:10000?wireFormat=proto2";
+ receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto2";
+ } else {
+ if (forceMarshalling) {
+ sendBrokerURI = "pipe://SendBroker?wireFormat=proto";
+ receiveBrokerURI = "pipe://ReceiveBroker?wireFormat=proto";
+ } else {
+ sendBrokerURI = "pipe://SendBroker";
+ receiveBrokerURI = "pipe://ReceiveBroker";
+ }
+ }
+ }
+
+ protected IDispatcher createDispatcher() {
+ return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize);
+ }
+
+ public void test_10_10_10() throws Exception {
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_1_1_0() throws Exception {
+ producerCount = 1;
+ destCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_1_1_1() throws Exception {
+ producerCount = 1;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_10_1_10() throws Exception {
+ producerCount = FANIN_COUNT;
+ consumerCount = FANOUT_COUNT;
+ destCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_10_1_1() throws Exception {
+ producerCount = FANIN_COUNT;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_1_1_10() throws Exception {
+ producerCount = 1;
+ destCount = 1;
+ consumerCount = FANOUT_COUNT;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_2_2_2() throws Exception {
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ /**
+ * Tests 2 producers sending to 1 destination with 2 consumres, but with
+ * consumers set to select only messages from each producer. 1 consumers is
+ * set to slow, the other producer should be able to send quickly.
+ *
+ * @throws Exception
+ */
+ public void test_2_2_2_SlowConsumer() throws Exception {
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+ consumers.get(0).setThinkTime(50);
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_2_2_2_Selector() throws Exception {
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+
+ // Add properties to match producers to their consumers
+ for (int i = 0; i < consumerCount; i++) {
+ String property = "match" + i;
+ consumers.get(i).setSelector(property);
+ producers.get(i).setProperty(property);
+ }
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ /**
+ * Test sending with 1 high priority sender. The high priority sender should
+ * have higher throughput than the other low priority senders.
+ *
+ * @throws Exception
+ */
+ public void test_2_1_1_HighPriorityProducer() throws Exception {
+
+ producerCount = 2;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+ RemoteProducer producer = producers.get(0);
+ producer.setPriority(1);
+ producer.getRate().setName("High Priority Producer Rate");
+
+ consumers.get(0).setThinkTime(1);
+
+ // Start 'em up.
+ startServices();
+ try {
+
+ System.out.println("Checking rates for test: " + getName());
+ for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+ Period p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(producer.getRate().getRateSummary(p));
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+
+ } finally {
+ stopServices();
+ }
+ }
+
+ /**
+ * Test sending with 1 high priority sender. The high priority sender should
+ * have higher throughput than the other low priority senders.
+ *
+ * @throws Exception
+ */
+ public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
+ producerCount = 2;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+ RemoteProducer producer = producers.get(0);
+ producer.setPriority(1);
+ producer.setPriorityMod(3);
+ producer.getRate().setName("High Priority Producer Rate");
+
+ consumers.get(0).setThinkTime(1);
+
+ // Start 'em up.
+ startServices();
+ try {
+
+ System.out.println("Checking rates for test: " + getName());
+ for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+ Period p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(producer.getRate().getRateSummary(p));
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+
+ } finally {
+ stopServices();
+ }
+ }
+
+ private void reportRates() throws InterruptedException {
+ System.out.println("Checking rates for test: " + getName() + ", " + (ptp ? "ptp" : "topic"));
+ for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+ Period p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+ }
+
+ private void createConnections() throws IOException, URISyntaxException {
+
+ if (multibroker) {
+ sendBroker = createBroker("SendBroker", sendBrokerURI);
+ rcvBroker = createBroker("RcvBroker", receiveBrokerURI);
+ brokers.add(sendBroker);
+ brokers.add(rcvBroker);
+ } else {
+ sendBroker = rcvBroker = createBroker("Broker", sendBrokerURI);
+ brokers.add(sendBroker);
+ }
+
+ Destination[] dests = new Destination[destCount];
+
+ for (int i = 0; i < destCount; i++) {
+ Destination.SingleDestination bean = new Destination.SingleDestination();
+ bean.setName(new AsciiBuffer("dest" + (i + 1)));
+ bean.setDomain(ptp ? Router.QUEUE_DOMAIN : Router.TOPIC_DOMAIN);
+ dests[i] = bean;
+ if (ptp) {
+ Queue queue = createQueue(sendBroker, dests[i]);
+ sendBroker.addQueue(queue);
+ if (multibroker) {
+ queue = createQueue(rcvBroker, dests[i]);
+ rcvBroker.addQueue(queue);
+ }
+ }
+ }
+
+ for (int i = 0; i < producerCount; i++) {
+ Destination destination = dests[i % destCount];
+ RemoteProducer producer = createProducer(i, destination);
+ producers.add(producer);
+ }
+
+ for (int i = 0; i < consumerCount; i++) {
+ Destination destination = dests[i % destCount];
+ RemoteConsumer consumer = createConsumer(i, destination);
+ consumers.add(consumer);
+ }
+
+ // Create MultiBroker connections:
+ // if (multibroker) {
+ // Pipe<Message> pipe = new Pipe<Message>();
+ // sendBroker.createBrokerConnection(rcvBroker, pipe);
+ // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
+ // }
+ }
+
+ private RemoteConsumer createConsumer(int i, Destination destination) {
+ RemoteConsumer consumer = new RemoteConsumer();
+ consumer.setUri(rcvBroker.getConnectURI());
+ consumer.setDestination(destination);
+ consumer.setName("consumer" + (i + 1));
+ consumer.setTotalConsumerRate(totalConsumerRate);
+ consumer.setDispatcher(dispatcher);
+ return consumer;
+ }
+
+ private RemoteProducer createProducer(int id, Destination destination) {
+ RemoteProducer producer = new RemoteProducer();
+ producer.setUri(sendBroker.getConnectURI());
+ producer.setProducerId(id + 1);
+ producer.setName("producer" + (id + 1));
+ producer.setDestination(destination);
+ producer.setMessageIdGenerator(msgIdGenerator);
+ producer.setTotalProducerRate(totalProducerRate);
+ producer.setDispatcher(dispatcher);
+ return producer;
+ }
+
+ private Queue createQueue(Broker broker, Destination destination) {
+ Queue queue = new Queue();
+ queue.setBroker(broker);
+ queue.setDestination(destination);
+ queue.setKeyExtractor(KEY_MAPPER);
+ if (usePartitionedQueue) {
+ queue.setPartitionMapper(PARTITION_MAPPER);
+ }
+ return queue;
+ }
+
+ private Broker createBroker(String name, String uri) {
+ Broker broker = new Broker();
+ broker.setName(name);
+ broker.setUri(uri);
+ broker.setDispatcher(dispatcher);
+ return broker;
+ }
+
+ private void stopServices() throws Exception {
+ for (RemoteProducer connection : producers) {
+ connection.stop();
+ }
+ for (RemoteConsumer connection : consumers) {
+ connection.stop();
+ }
+ for (Broker broker : brokers) {
+ broker.stop();
+ }
+ if (dispatcher != null) {
+ dispatcher.shutdown();
+ }
+ }
+
+ private void startServices() throws Exception {
+ for (Broker broker : brokers) {
+ broker.start();
+ }
+ for (RemoteConsumer connection : consumers) {
+ connection.start();
+ }
+
+ for (RemoteProducer connection : producers) {
+ connection.start();
+ }
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,83 @@
+package org.apache.activemq.broker.openwire;
+
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class OpenWireSupport {
+
+ static private long idGenerator;
+ static private int msgIdGenerator;
+ static private int txGenerator;
+ static private int tempDestGenerator;
+
+ public static ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception {
+ ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+ info.setBrowser(false);
+ info.setDestination(destination);
+ info.setPrefetchSize(1000);
+ info.setDispatchAsync(false);
+ return info;
+ }
+
+ public static RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) {
+ return consumerInfo.createRemoveCommand();
+ }
+
+ public static ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
+ ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+ return info;
+ }
+
+ public static SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
+ SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+ return info;
+ }
+
+ public static ConnectionInfo createConnectionInfo() throws Exception {
+ ConnectionInfo info = new ConnectionInfo();
+ info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+ info.setClientId(info.getConnectionId().getValue());
+ return info;
+ }
+
+ public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
+ return createMessage(producerInfo, destination, 4, null);
+ }
+
+ public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int priority, String payload) {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setJMSPriority(priority);
+ message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+ message.setDestination(destination);
+ message.setPersistent(false);
+ if( payload!=null ) {
+ try {
+ message.setText(payload);
+ } catch (MessageNotWriteableException e) {
+ }
+ }
+ return message;
+ }
+
+ public static MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) {
+ MessageAck ack = new MessageAck();
+ ack.setAckType(ackType);
+ ack.setConsumerId(consumerInfo.getConsumerId());
+ ack.setDestination(msg.getDestination());
+ ack.setLastMessageId(msg.getMessageId());
+ ack.setMessageCount(count);
+ return ack;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java Thu Mar 12 18:59:12 2009
@@ -0,0 +1,169 @@
+package org.apache.activemq.broker.openwire;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.Connection;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.transport.DispatchableTransport;
+import org.apache.activemq.transport.TransportFactory;
+
+public class RemoteConsumer extends Connection {
+
+ private final MetricCounter consumerRate = new MetricCounter();
+
+ private MetricAggregator totalConsumerRate;
+ private long thinkTime;
+ private Destination destination;
+ private String selector;
+ private URI uri;
+
+ private boolean schedualWait;
+
+ protected final Object inboundMutex = new Object();
+ private FlowController<MessageDelivery> inboundController;
+
+ public void start() throws Exception {
+ consumerRate.name("Consumer " + name + " Rate");
+ totalConsumerRate.add(consumerRate);
+
+ transport = TransportFactory.compositeConnect(uri);
+ if(transport instanceof DispatchableTransport)
+ {
+ DispatchableTransport dt = ((DispatchableTransport)transport);
+ dt.setName(name + "-client-transport");
+ dt.setDispatcher(getDispatcher());
+ schedualWait = true;
+ }
+ transport.setTransportListener(this);
+ transport.start();
+
+ // Let the remote side know our name.
+ transport.oneway(name);
+ // Sending the destination acts as the subscribe.
+ transport.oneway(destination);
+ super.initialize();
+ }
+
+ protected void initialize() {
+
+ // Setup the input processing..
+ Flow flow = new Flow(name, false);
+ SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(inputWindowSize, inputResumeThreshold);
+ inboundController = new FlowController<MessageDelivery>(new FlowControllable<MessageDelivery>() {
+ public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+ messageReceived(controller, elem);
+ }
+ public String toString() {
+ return name;
+ }
+ public IFlowSink<MessageDelivery> getFlowSink() {
+ return null;
+ }
+ public IFlowSource<MessageDelivery> getFlowSource() {
+ return null;
+ }
+ }, flow, limiter, inboundMutex);
+ }
+
+ public void onCommand(Object command) {
+ try {
+ if (command.getClass() == MessageDelivery.class) {
+ MessageDelivery msg = (MessageDelivery) command;
+ inboundController.add(msg, null);
+ } else {
+ onException(new Exception("Unrecognized command: " + command));
+ }
+ } catch (Exception e) {
+ onException(e);
+ }
+ }
+
+ protected void messageReceived(final ISourceController<MessageDelivery> controller, final MessageDelivery elem) {
+ if( schedualWait ) {
+ if (thinkTime > 0) {
+ getDispatcher().schedule(new Runnable(){
+
+ public void run() {
+ consumerRate.increment();
+ controller.elementDispatched(elem);
+ }
+
+ }, thinkTime, TimeUnit.MILLISECONDS);
+
+ }
+ else
+ {
+ consumerRate.increment();
+ controller.elementDispatched(elem);
+ }
+
+ } else {
+ if( thinkTime>0 ) {
+ try {
+ Thread.sleep(thinkTime);
+ } catch (InterruptedException e) {
+ }
+ }
+ consumerRate.increment();
+ controller.elementDispatched(elem);
+ }
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public MetricAggregator getTotalConsumerRate() {
+ return totalConsumerRate;
+ }
+
+ public void setTotalConsumerRate(MetricAggregator totalConsumerRate) {
+ this.totalConsumerRate = totalConsumerRate;
+ }
+
+ public Destination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(Destination destination) {
+ this.destination = destination;
+ }
+
+ public long getThinkTime() {
+ return thinkTime;
+ }
+
+ public void setThinkTime(long thinkTime) {
+ this.thinkTime = thinkTime;
+ }
+
+ public MetricCounter getConsumerRate() {
+ return consumerRate;
+ }
+
+ public String getSelector() {
+ return selector;
+ }
+
+ public void setSelector(String selector) {
+ this.selector = selector;
+ }
+
+ public URI getUri() {
+ return uri;
+ }
+
+ public void setUri(URI uri) {
+ this.uri = uri;
+ }}