You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/12 21:18:26 UTC
svn commit: r752995 - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/openwire/
test/java/org/apache/activemq/broker/openwire/ test/java/org/apache/active...
Author: chirino
Date: Thu Mar 12 20:18:25 2009
New Revision: 752995
URL: http://svn.apache.org/viewvc?rev=752995&view=rev
Log:
Producer actually sends the broker messages now
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/Openwire2Support.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
Removed:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=752995&r1=752994&r2=752995&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Thu Mar 12 20:18:25 2009
@@ -2,44 +2,33 @@
import java.io.IOException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activemq.broker.DeliveryTarget;
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.IFlowLimiter;
-import org.apache.activemq.flow.IFlowRelay;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.Message;
import org.apache.activemq.flow.SizeLimiter;
-import org.apache.activemq.queue.SingleFlowRelay;
import org.apache.activemq.transport.DispatchableTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
-abstract public class Connection implements TransportListener, DeliveryTarget {
+abstract public class Connection implements TransportListener {
protected Transport transport;
-
protected String name;
private int priorityLevels;
-
protected final int outputWindowSize = 1000;
protected final int outputResumeThreshold = 900;
-
protected final int inputWindowSize = 1000;
protected final int inputResumeThreshold = 500;
- protected IFlowRelay<MessageDelivery> outputQueue;
-
private IDispatcher dispatcher;
private final AtomicBoolean stopping = new AtomicBoolean();
- protected Flow outputFlow;
- protected boolean blockingTransport = false;
- protected ExecutorService blockingWriter;
+ private ExecutorService blockingWriter;
public void setTransport(Transport transport) {
this.transport = transport;
@@ -47,6 +36,15 @@
public void start() throws Exception {
transport.setTransportListener(this);
+ if (transport instanceof DispatchableTransport) {
+ DispatchableTransport dt = ((DispatchableTransport) transport);
+ if (name != null) {
+ dt.setName(name);
+ }
+ dt.setDispatcher(getDispatcher());
+ } else {
+ blockingWriter = Executors.newSingleThreadExecutor();
+ }
transport.start();
}
@@ -64,8 +62,8 @@
}
protected final void write(final Object o) {
- synchronized (outputQueue) {
- if (!blockingTransport) {
+ synchronized (transport) {
+ if (blockingWriter==null) {
try {
transport.oneway(o);
} catch (IOException e) {
@@ -130,13 +128,6 @@
public void setDispatcher(IDispatcher dispatcher) {
this.dispatcher = dispatcher;
- if (transport instanceof DispatchableTransport) {
- DispatchableTransport dt = ((DispatchableTransport) transport);
- if (name != null) {
- dt.setName(name);
- }
- dt.setDispatcher(getDispatcher());
- }
}
public int getOutputWindowSize() {
@@ -155,14 +146,6 @@
return inputResumeThreshold;
}
- public IFlowRelay<MessageDelivery> getSink() {
- return outputQueue;
- }
-
- public boolean match(MessageDelivery message) {
- return true;
- }
-
protected interface ProtocolLimiter<E> extends IFlowLimiter<E> {
public void onProtocolCredit(int credit);
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java?rev=752995&r1=752994&r2=752995&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java Thu Mar 12 20:18:25 2009
@@ -109,10 +109,6 @@
this.uri = uri;
}
- public URI getConnectURI() {
- return transportServer.getConnectURI();
- }
-
public boolean isStopping() {
return stopping.get();
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java?rev=752995&r1=752994&r2=752995&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java Thu Mar 12 20:18:25 2009
@@ -50,26 +50,26 @@
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
import org.apache.activemq.flow.IFlowController;
+import org.apache.activemq.flow.IFlowDrain;
import org.apache.activemq.flow.IFlowSink;
import org.apache.activemq.flow.IFlowSource;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.flow.ISinkController.FlowControllable;
import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.SingleFlowRelay;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.transport.DispatchableTransport;
public class OpenwireBrokerConnection extends BrokerConnection {
-
+
protected final HashMap<ProducerId, ProducerContext> producers = new HashMap<ProducerId, ProducerContext>();
protected final HashMap<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId, ConsumerContext>();
-
+
protected final Object inboundMutex = new Object();
protected IFlowController<MessageDelivery> inboundController;
-
- protected IFlowController<MessageDelivery> outboundController;
-// public ProtocolLimiter<MessageDelivery> outboundLimiter;
- protected Flow ouboundFlow;
+// private SingleFlowRelay<MessageDelivery> outboundQueue;
public void onCommand(Object o) {
final Command command = (Command) o;
@@ -77,152 +77,185 @@
try {
command.visit(new CommandVisitor() {
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
// Methods that keep track of the client state
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
public Response processAddConnection(ConnectionInfo info) throws Exception {
return ack(command);
}
+
public Response processAddSession(SessionInfo info) throws Exception {
return ack(command);
}
+
public Response processAddProducer(ProducerInfo info) throws Exception {
producers.put(info.getProducerId(), new ProducerContext(info));
return ack(command);
}
+
public Response processAddConsumer(ConsumerInfo info) throws Exception {
ConsumerContext ctx = new ConsumerContext(info);
consumers.put(info.getConsumerId(), ctx);
-
broker.getRouter().bind(convert(info.getDestination()), ctx);
-
-
return ack(command);
}
+
public Response processRemoveConnection(ConnectionId info, long arg1) throws Exception {
return ack(command);
}
+
public Response processRemoveSession(SessionId info, long arg1) throws Exception {
return ack(command);
}
+
public Response processRemoveProducer(ProducerId info) throws Exception {
producers.remove(info);
return ack(command);
}
+
public Response processRemoveConsumer(ConsumerId info, long arg1) throws Exception {
return ack(command);
}
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
// Message Processing Methods.
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
public Response processMessage(Message info) throws Exception {
ProducerId producerId = info.getProducerId();
ProducerContext producerContext = producers.get(producerId);
-
+
OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
-
- // Only producers that are not using a window will block, and if it blocks.
- // yes we block the connection's read thread. yes other sessions will not get
- // serviced while we block here. The producer is depending on TCP flow
- // control to slow him down so we have to stop ready from the socket at this
+
+ // Only producers that are not using a window will block,
+ // and if it blocks.
+ // yes we block the connection's read thread. yes other
+ // sessions will not get
+ // serviced while we block here. The producer is depending
+ // on TCP flow
+ // control to slow him down so we have to stop ready from
+ // the socket at this
// point.
- while( !producerContext.controller.offer(md, null) ) {
+ while (!producerContext.controller.offer(md, null)) {
producerContext.controller.waitForFlowUnblock();
}
return null;
}
+
public Response processMessageAck(MessageAck info) throws Exception {
return ack(command);
}
+
+ // Only used when client prefetch is set to zero.
public Response processMessagePull(MessagePull info) throws Exception {
return ack(command);
}
- public Response processProducerAck(ProducerAck info) throws Exception {
- return ack(command);
- }
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
// Control Methods
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
public Response processWireFormat(WireFormatInfo info) throws Exception {
return ack(command);
}
+
public Response processShutdown(ShutdownInfo info) throws Exception {
return ack(command);
}
+
public Response processKeepAlive(KeepAliveInfo info) throws Exception {
return ack(command);
}
+
public Response processFlush(FlushCommand info) throws Exception {
return ack(command);
}
+
public Response processConnectionControl(ConnectionControl info) throws Exception {
return ack(command);
}
+
public Response processConnectionError(ConnectionError info) throws Exception {
return ack(command);
}
+
public Response processConsumerControl(ConsumerControl info) throws Exception {
return ack(command);
}
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
// Methods for server management
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
public Response processAddDestination(DestinationInfo info) throws Exception {
throw new UnsupportedOperationException();
}
+
public Response processRemoveDestination(DestinationInfo info) throws Exception {
throw new UnsupportedOperationException();
}
+
public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
throw new UnsupportedOperationException();
}
+
public Response processControlCommand(ControlCommand info) throws Exception {
throw new UnsupportedOperationException();
}
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
// Methods for transaction management
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
public Response processBeginTransaction(TransactionInfo info) throws Exception {
throw new UnsupportedOperationException();
}
+
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
throw new UnsupportedOperationException();
}
+
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
throw new UnsupportedOperationException();
}
+
public Response processEndTransaction(TransactionInfo info) throws Exception {
throw new UnsupportedOperationException();
}
+
public Response processForgetTransaction(TransactionInfo info) throws Exception {
throw new UnsupportedOperationException();
}
+
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
throw new UnsupportedOperationException();
}
+
public Response processRecoverTransactions(TransactionInfo info) throws Exception {
throw new UnsupportedOperationException();
}
+
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
throw new UnsupportedOperationException();
}
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
// Methods for cluster operations
- ///////////////////////////////////////////////////////////////////
+ // These commands are sent to the broker when it's acting like a
+ // client to another broker.
+ // /////////////////////////////////////////////////////////////////
public Response processBrokerInfo(BrokerInfo info) throws Exception {
throw new UnsupportedOperationException();
}
+
public Response processMessageDispatch(MessageDispatch info) throws Exception {
throw new UnsupportedOperationException();
}
+
public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception {
throw new UnsupportedOperationException();
}
+
+ public Response processProducerAck(ProducerAck info) throws Exception {
+ return ack(command);
+ }
+
});
} catch (Exception e) {
if (responseRequired) {
@@ -232,34 +265,33 @@
} else {
onException(e);
}
-
+
}
}
-
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
// Internal Support Methods
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
private Response ack(Command command) {
- Response rc = null;
- if( command.isResponseRequired() ) {
- rc = new Response();
+ if (command.isResponseRequired()) {
+ Response rc = new Response();
rc.setCorrelationId(command.getCommandId());
+ write(rc);
}
- return rc;
+ return null;
}
-
+
@Override
public void start() throws Exception {
super.start();
BrokerInfo info = new BrokerInfo();
info.setBrokerId(new BrokerId(broker.getName()));
info.setBrokerName(broker.getName());
- info.setBrokerURL(broker.getConnectURI().toString());
+ info.setBrokerURL(broker.getUri());
write(info);
}
-
+
static class FlowControllableAdapter implements FlowControllable<MessageDelivery> {
public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
}
@@ -273,17 +305,78 @@
}
}
-
+ class ProducerContext {
+
+ private final ProducerInfo info;
+ private IFlowController<MessageDelivery> controller;
+ private String name;
+
+ public ProducerContext(final ProducerInfo info) {
+ this.info = info;
+ this.name = info.getProducerId().toString();
+
+ // Openwire only uses credit windows at the producer level for
+ // producers that request the feature.
+ if (info.getWindowSize() > 0) {
+ Flow flow = new Flow(info.getProducerId().toString(), false);
+ WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize() / 2) {
+ @Override
+ protected void sendCredit(int credit) {
+ ProducerAck ack = new ProducerAck(info.getProducerId(), credit);
+ write(ack);
+ }
+ };
+
+ controller = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
+ public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+ route(controller, elem);
+ }
+
+ public String toString() {
+ return name;
+ }
+ }, flow, limiter, inboundMutex);
+ } else {
+ controller = inboundController;
+ }
+ }
+ }
+
class ConsumerContext implements DeliveryTarget {
-
+
private final ConsumerInfo info;
private String name;
private BooleanExpression selector;
+ private SingleFlowRelay<MessageDelivery> queue;
+ public ProtocolLimiter<MessageDelivery> limiter;
+
public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException {
this.info = info;
this.name = info.getConsumerId().toString();
selector = parseSelector(info);
+
+ Flow flow = new Flow(name, false);
+ limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) {
+ public int getElementSize(MessageDelivery m) {
+ return 1;
+ }
+ };
+ queue = new SingleFlowRelay<MessageDelivery>(flow, name + "-outbound", limiter);
+ if (transport instanceof DispatchableTransport) {
+ queue.setDrain(new IFlowDrain<MessageDelivery>() {
+ public void drain(MessageDelivery message, ISourceController<MessageDelivery> controller) {
+ write(message);
+ }
+ });
+
+ } else {
+ queue.setDrain(new IFlowDrain<MessageDelivery>() {
+ public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
+ write(message);
+ };
+ });
+ }
}
public IFlowSink<MessageDelivery> getSink() {
@@ -293,10 +386,10 @@
public boolean match(MessageDelivery message) {
Message msg = message.asType(Message.class);
- if( msg ==null ) {
+ if (msg == null) {
return false;
}
-
+
MessageEvaluationContext selectorContext = new MessageEvaluationContext();
selectorContext.setMessageReference(msg);
selectorContext.setDestination(info.getDestination());
@@ -307,82 +400,26 @@
return false;
}
}
-
- }
- private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
- BooleanExpression rc = null;
- if (info.getSelector() != null) {
- rc = SelectorParser.parse(info.getSelector());
- }
- if (info.isNoLocal()) {
- if (rc == null) {
- rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
- } else {
- rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
- }
- }
- if (info.getAdditionalPredicate() != null) {
- if (rc == null) {
- rc = info.getAdditionalPredicate();
- } else {
- rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
- }
- }
- return rc;
}
- class ProducerContext {
-
- private final ProducerInfo info;
- private IFlowController<MessageDelivery> controller;
- private String name;
-
- public ProducerContext(final ProducerInfo info) {
- this.info = info;
- this.name = info.getProducerId().toString();
-
- // Openwire only uses credit windows at the producer level for producers that request the feature.
- if( info.getWindowSize() > 0 ) {
- Flow flow = new Flow(info.getProducerId().toString(), false);
-
- WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize()/2) {
- @Override
- protected void sendCredit(int credit) {
- ProducerAck ack = new ProducerAck(info.getProducerId(), credit);
- write(ack);
- }
- };
-
- controller = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
- public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
- route(controller, elem);
- }
- public String toString() {
- return name;
- }
- }, flow, limiter, inboundMutex);
- } else {
- controller = inboundController;
- }
- }
- }
-
protected void route(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
// TODO:
- // Consider doing some caching of this target list. Most producers always send to
+ // Consider doing some caching of this target list. Most producers
+ // always send to
// the same destination.
Collection<DeliveryTarget> targets = broker.getRouter().route(elem);
-
- final Message message = ((OpenWireMessageDelivery)elem).getMessage();
- if( targets != null ) {
-
- if( message.isResponseRequired() ) {
+
+ final Message message = ((OpenWireMessageDelivery) elem).getMessage();
+ if (targets != null) {
+
+ if (message.isResponseRequired()) {
// We need to ack the message once we ensure we won't loose it.
- // We know we won't loose it once it's persisted or delivered to a consumer
+ // We know we won't loose it once it's persisted or delivered to
+ // a consumer
// Setup a callback to get notifed once one of those happens.
- if( message.isPersistent() ) {
- elem.setCompletionCallback(new Runnable(){
+ if (message.isPersistent()) {
+ elem.setCompletionCallback(new Runnable() {
public void run() {
ack(message);
}
@@ -392,81 +429,90 @@
ack(message);
}
}
-
+
// Deliver the message to all the targets..
for (DeliveryTarget dt : targets) {
if (dt.match(elem)) {
dt.getSink().add(elem, controller);
}
}
-
+
} else {
- // Let the client know we got the message even though there
+ // Let the client know we got the message even though there
// were no valid targets to deliver the message to.
- if( message.isResponseRequired() ) {
+ if (message.isResponseRequired()) {
ack(message);
}
}
controller.elementDispatched(elem);
}
-
+
protected void initialize() {
-
- // Setup the input processing..
+
+ // Setup the inbound processing..
Flow flow = new Flow(name, false);
SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(inputWindowSize, inputResumeThreshold);
inboundController = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
route(controller, elem);
}
+
public String toString() {
return name;
}
}, flow, limiter, inboundMutex);
-// ouboundFlow = new Flow(name, false);
-// outboundLimiter = new WindowLimiter<MessageDelivery>(true, ouboundFlow, outputWindowSize, outputResumeThreshold);
-// outputQueue = new SingleFlowRelay<MessageDelivery>(ouboundFlow, name + "-outbound", outboundLimiter);
-// outboundController = outputQueue.getFlowController(ouboundFlow);
-//
-// if (transport instanceof DispatchableTransport) {
-// outputQueue.setDrain(new IFlowDrain<MessageDelivery>() {
-//
-// public void drain(MessageDelivery message, ISourceController<MessageDelivery> controller) {
-// write(message);
-// }
-// });
-//
-// } else {
-// blockingTransport = true;
-// blockingWriter = Executors.newSingleThreadExecutor();
-// outputQueue.setDrain(new IFlowDrain<MessageDelivery>() {
-// public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
-// write(message);
-// };
-// });
-//
-// }
+// Flow ouboundFlow = new Flow(name, false);
+// SizeLimiter<MessageDelivery> outboundLimiter = new SizeLimiter<MessageDelivery>(outputWindowSize, outputResumeThreshold);
+// outboundQueue = new SingleFlowRelay<MessageDelivery>(ouboundFlow, name + "-outbound", outboundLimiter);
+// outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+// public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
+// write(message);
+// };
+// });
}
-
+
static public Destination convert(ActiveMQDestination dest) {
- if( dest.isComposite() ) {
+ if (dest.isComposite()) {
ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations();
- ArrayList<Destination> d= new ArrayList<Destination>();
+ ArrayList<Destination> d = new ArrayList<Destination>();
for (int i = 0; i < compositeDestinations.length; i++) {
d.add(convert(compositeDestinations[i]));
}
return new Destination.MultiDestination(d);
- }
+ }
AsciiBuffer domain;
- if( dest.isQueue() ) {
+ if (dest.isQueue()) {
domain = Router.QUEUE_DOMAIN;
- } if( dest.isTopic() ) {
+ }
+ if (dest.isTopic()) {
domain = Router.TOPIC_DOMAIN;
} else {
- throw new IllegalArgumentException("Unsupported domain type: "+ dest);
+ throw new IllegalArgumentException("Unsupported domain type: " + dest);
}
- return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
+ return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
+ }
+
+ private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
+ BooleanExpression rc = null;
+ if (info.getSelector() != null) {
+ rc = SelectorParser.parse(info.getSelector());
+ }
+ if (info.isNoLocal()) {
+ if (rc == null) {
+ rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
+ } else {
+ rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
+ }
+ }
+ if (info.getAdditionalPredicate() != null) {
+ if (rc == null) {
+ rc = info.getAdditionalPredicate();
+ } else {
+ rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
+ }
+ }
+ return rc;
}
}
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/Openwire2Support.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/Openwire2Support.java?rev=752995&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/Openwire2Support.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/Openwire2Support.java Thu Mar 12 20:18:25 2009
@@ -0,0 +1,82 @@
+package org.apache.activemq.broker.openwire;
+
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class Openwire2Support {
+
+ static private long idGenerator;
+ static private long msgIdGenerator;
+
+ public static ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception {
+ ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+ info.setBrowser(false);
+ info.setDestination(destination);
+ info.setPrefetchSize(1000);
+ info.setDispatchAsync(false);
+ return info;
+ }
+
+ public static RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) {
+ return consumerInfo.createRemoveCommand();
+ }
+
+ public static ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
+ ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+ return info;
+ }
+
+ public static SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
+ SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+ return info;
+ }
+
+ public static ConnectionInfo createConnectionInfo() throws Exception {
+ ConnectionInfo info = new ConnectionInfo();
+ info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+ info.setClientId(info.getConnectionId().getValue());
+ return info;
+ }
+
+ public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
+ return createMessage(producerInfo, destination, 4, null);
+ }
+
+ public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int priority, String payload) {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setJMSPriority(priority);
+ message.setProducerId(producerInfo.getProducerId());
+ message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+ message.setDestination(destination);
+ message.setPersistent(false);
+ if( payload!=null ) {
+ try {
+ message.setText(payload);
+ } catch (MessageNotWriteableException e) {
+ }
+ }
+ return message;
+ }
+
+ public static MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) {
+ MessageAck ack = new MessageAck();
+ ack.setAckType(ackType);
+ ack.setConsumerId(consumerInfo.getConsumerId());
+ ack.setDestination(msg.getDestination());
+ ack.setLastMessageId(msg.getMessageId());
+ ack.setMessageCount(count);
+ return ack;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java?rev=752995&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java Thu Mar 12 20:18:25 2009
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.openwire;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Queue;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.PriorityDispatcher;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.Period;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.Mapper;
+
+public class OpenwireBrokerTest extends TestCase {
+
+ protected static final int PERFORMANCE_SAMPLES = 3000000;
+
+ protected static final int IO_WORK_AMOUNT = 0;
+ protected static final int FANIN_COUNT = 10;
+ protected static final int FANOUT_COUNT = 10;
+
+ protected static final int PRIORITY_LEVELS = 10;
+ protected static final boolean USE_INPUT_QUEUES = true;
+
+ // Set to put senders and consumers on separate brokers.
+ protected boolean multibroker = false;
+
+ // Set to mockup up ptp:
+ protected boolean ptp = false;
+
+ // Set to use tcp IO
+ protected boolean tcp = true;
+ // set to force marshalling even in the NON tcp case.
+ protected boolean forceMarshalling = false;
+
+ protected String sendBrokerURI;
+ protected String receiveBrokerURI;
+
+ // Set's the number of threads to use:
+ protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors();
+ protected boolean usePartitionedQueue = false;
+
+ protected int producerCount;
+ protected int consumerCount;
+ protected int destCount;
+
+ protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
+ protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
+
+ protected Broker sendBroker;
+ protected Broker rcvBroker;
+ protected ArrayList<Broker> brokers = new ArrayList<Broker>();
+ protected IDispatcher dispatcher;
+ protected final AtomicLong msgIdGenerator = new AtomicLong();
+
+ final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
+ final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
+
+ static public final Mapper<AsciiBuffer, MessageDelivery> KEY_MAPPER = new Mapper<AsciiBuffer, MessageDelivery>() {
+ public AsciiBuffer map(MessageDelivery element) {
+ return element.getMsgId();
+ }
+ };
+ static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
+ public Integer map(MessageDelivery element) {
+ // we modulo 10 to have at most 10 partitions which the producers
+ // gets split across.
+ return (int) (element.getProducerId().hashCode() % 10);
+ }
+ };
+
+ @Override
+ protected void setUp() throws Exception {
+ dispatcher = createDispatcher();
+ dispatcher.start();
+ if (tcp) {
+ sendBrokerURI = "tcp://localhost:10000";
+ receiveBrokerURI = "tcp://localhost:20000";
+ } else {
+ if (forceMarshalling) {
+ sendBrokerURI = "pipe://SendBroker";
+ receiveBrokerURI = "pipe://ReceiveBroker";
+ } else {
+ sendBrokerURI = "pipe://SendBroker";
+ receiveBrokerURI = "pipe://ReceiveBroker";
+ }
+ }
+ }
+
+ protected IDispatcher createDispatcher() {
+ return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize);
+ }
+
+ public void test_10_10_10() throws Exception {
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_1_1_0() throws Exception {
+ producerCount = 1;
+ destCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_1_1_1() throws Exception {
+ producerCount = 1;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_10_1_10() throws Exception {
+ producerCount = FANIN_COUNT;
+ consumerCount = FANOUT_COUNT;
+ destCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_10_1_1() throws Exception {
+ producerCount = FANIN_COUNT;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_1_1_10() throws Exception {
+ producerCount = 1;
+ destCount = 1;
+ consumerCount = FANOUT_COUNT;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_2_2_2() throws Exception {
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ /**
+ * Tests 2 producers sending to 1 destination with 2 consumres, but with
+ * consumers set to select only messages from each producer. 1 consumers is
+ * set to slow, the other producer should be able to send quickly.
+ *
+ * @throws Exception
+ */
+ public void test_2_2_2_SlowConsumer() throws Exception {
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+ consumers.get(0).setThinkTime(50);
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_2_2_2_Selector() throws Exception {
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+
+ // Add properties to match producers to their consumers
+ for (int i = 0; i < consumerCount; i++) {
+ String property = "match" + i;
+ consumers.get(i).setSelector(property);
+ producers.get(i).setProperty(property);
+ }
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ /**
+ * Test sending with 1 high priority sender. The high priority sender should
+ * have higher throughput than the other low priority senders.
+ *
+ * @throws Exception
+ */
+ public void test_2_1_1_HighPriorityProducer() throws Exception {
+
+ producerCount = 2;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+ RemoteProducer producer = producers.get(0);
+ producer.setPriority(1);
+ producer.getRate().setName("High Priority Producer Rate");
+
+ consumers.get(0).setThinkTime(1);
+
+ // Start 'em up.
+ startServices();
+ try {
+
+ System.out.println("Checking rates for test: " + getName());
+ for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+ Period p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(producer.getRate().getRateSummary(p));
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+
+ } finally {
+ stopServices();
+ }
+ }
+
+ /**
+ * Test sending with 1 high priority sender. The high priority sender should
+ * have higher throughput than the other low priority senders.
+ *
+ * @throws Exception
+ */
+ public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
+ producerCount = 2;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+ RemoteProducer producer = producers.get(0);
+ producer.setPriority(1);
+ producer.setPriorityMod(3);
+ producer.getRate().setName("High Priority Producer Rate");
+
+ consumers.get(0).setThinkTime(1);
+
+ // Start 'em up.
+ startServices();
+ try {
+
+ System.out.println("Checking rates for test: " + getName());
+ for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+ Period p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(producer.getRate().getRateSummary(p));
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+
+ } finally {
+ stopServices();
+ }
+ }
+
+ private void reportRates() throws InterruptedException {
+ System.out.println("Checking rates for test: " + getName() + ", " + (ptp ? "ptp" : "topic"));
+ for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+ Period p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+ }
+
+ private void createConnections() throws IOException, URISyntaxException {
+
+ if (multibroker) {
+ sendBroker = createBroker("SendBroker", sendBrokerURI);
+ rcvBroker = createBroker("RcvBroker", receiveBrokerURI);
+ brokers.add(sendBroker);
+ brokers.add(rcvBroker);
+ } else {
+ sendBroker = rcvBroker = createBroker("Broker", sendBrokerURI);
+ brokers.add(sendBroker);
+ }
+
+ Destination[] dests = new Destination[destCount];
+
+ for (int i = 0; i < destCount; i++) {
+ Destination.SingleDestination bean = new Destination.SingleDestination();
+ bean.setName(new AsciiBuffer("dest" + (i + 1)));
+ bean.setDomain(ptp ? Router.QUEUE_DOMAIN : Router.TOPIC_DOMAIN);
+ dests[i] = bean;
+ if (ptp) {
+ Queue queue = createQueue(sendBroker, dests[i]);
+ sendBroker.addQueue(queue);
+ if (multibroker) {
+ queue = createQueue(rcvBroker, dests[i]);
+ rcvBroker.addQueue(queue);
+ }
+ }
+ }
+
+ for (int i = 0; i < producerCount; i++) {
+ Destination destination = dests[i % destCount];
+ RemoteProducer producer = createProducer(i, destination);
+ producers.add(producer);
+ }
+
+ for (int i = 0; i < consumerCount; i++) {
+ Destination destination = dests[i % destCount];
+ RemoteConsumer consumer = createConsumer(i, destination);
+ consumers.add(consumer);
+ }
+
+ // Create MultiBroker connections:
+ // if (multibroker) {
+ // Pipe<Message> pipe = new Pipe<Message>();
+ // sendBroker.createBrokerConnection(rcvBroker, pipe);
+ // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
+ // }
+ }
+
+ private RemoteConsumer createConsumer(int i, Destination destination) throws URISyntaxException {
+ RemoteConsumer consumer = new RemoteConsumer();
+ consumer.setUri(new URI(rcvBroker.getUri()));
+ consumer.setDestination(destination);
+ consumer.setName("consumer" + (i + 1));
+ consumer.setTotalConsumerRate(totalConsumerRate);
+ consumer.setDispatcher(dispatcher);
+ return consumer;
+ }
+
+ private RemoteProducer createProducer(int id, Destination destination) throws URISyntaxException {
+ RemoteProducer producer = new RemoteProducer();
+ producer.setUri(new URI(sendBroker.getUri()));
+ producer.setProducerId(id + 1);
+ producer.setName("producer" + (id + 1));
+ producer.setDestination(destination);
+ producer.setMessageIdGenerator(msgIdGenerator);
+ producer.setTotalProducerRate(totalProducerRate);
+ producer.setDispatcher(dispatcher);
+ return producer;
+ }
+
+ private Queue createQueue(Broker broker, Destination destination) {
+ Queue queue = new Queue();
+ queue.setBroker(broker);
+ queue.setDestination(destination);
+ queue.setKeyExtractor(KEY_MAPPER);
+ if (usePartitionedQueue) {
+ queue.setPartitionMapper(PARTITION_MAPPER);
+ }
+ return queue;
+ }
+
+ private Broker createBroker(String name, String uri) {
+ Broker broker = new Broker();
+ broker.setName(name);
+ broker.setUri(uri);
+ broker.setDispatcher(dispatcher);
+ return broker;
+ }
+
+ private void stopServices() throws Exception {
+ for (RemoteProducer connection : producers) {
+ connection.stop();
+ }
+ for (RemoteConsumer connection : consumers) {
+ connection.stop();
+ }
+ for (Broker broker : brokers) {
+ broker.stop();
+ }
+ if (dispatcher != null) {
+ dispatcher.shutdown();
+ }
+ }
+
+ private void startServices() throws Exception {
+ for (Broker broker : brokers) {
+ broker.start();
+ }
+ for (RemoteConsumer connection : consumers) {
+ connection.start();
+ }
+
+ for (RemoteProducer connection : producers) {
+ connection.start();
+ }
+ }
+
+}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=752995&r1=752994&r2=752995&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java Thu Mar 12 20:18:25 2009
@@ -1,11 +1,24 @@
package org.apache.activemq.broker.openwire;
+import static org.apache.activemq.broker.openwire.Openwire2Support.createConnectionInfo;
+import static org.apache.activemq.broker.openwire.Openwire2Support.createConsumerInfo;
+import static org.apache.activemq.broker.openwire.Openwire2Support.createSessionInfo;
+
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.Connection;
import org.apache.activemq.broker.Destination;
import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
import org.apache.activemq.flow.IFlowSink;
@@ -32,27 +45,41 @@
protected final Object inboundMutex = new Object();
private FlowController<MessageDelivery> inboundController;
+
+ private ActiveMQDestination activemqDestination;
+
+ private ConnectionInfo connectionInfo;
+
+ private SessionInfo sessionInfo;
+
+ private ConsumerInfo consumerInfo;
public void start() throws Exception {
consumerRate.name("Consumer " + name + " Rate");
totalConsumerRate.add(consumerRate);
- transport = TransportFactory.compositeConnect(uri);
+ initialize();
+ transport = TransportFactory.connect(uri);
if(transport instanceof DispatchableTransport)
{
- DispatchableTransport dt = ((DispatchableTransport)transport);
- dt.setName(name + "-client-transport");
- dt.setDispatcher(getDispatcher());
schedualWait = true;
}
- transport.setTransportListener(this);
- transport.start();
+ super.start();
+
+ if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
+ activemqDestination = new ActiveMQQueue(destination.getName().toString());
+ } else {
+ activemqDestination = new ActiveMQTopic(destination.getName().toString());
+ }
+
+ connectionInfo = createConnectionInfo();
+ transport.oneway(connectionInfo);
+ sessionInfo = createSessionInfo(connectionInfo);
+ transport.oneway(sessionInfo);
+ consumerInfo = createConsumerInfo(sessionInfo, activemqDestination);
+ consumerInfo.setPrefetchSize(1000);
+ transport.oneway(consumerInfo);
- // Let the remote side know our name.
- transport.oneway(name);
- // Sending the destination acts as the subscribe.
- transport.oneway(destination);
- super.initialize();
}
protected void initialize() {
@@ -78,7 +105,10 @@
public void onCommand(Object command) {
try {
- if (command.getClass() == MessageDelivery.class) {
+ if (command.getClass() == WireFormatInfo.class) {
+ } else if (command.getClass() == BrokerInfo.class) {
+ System.out.println("Consumer "+name+" connected to "+((BrokerInfo)command).getBrokerName());
+ } else if (command.getClass() == MessageDelivery.class) {
MessageDelivery msg = (MessageDelivery) command;
inboundController.add(msg, null);
} else {
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java?rev=752995&r1=752994&r2=752995&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java Thu Mar 12 20:18:25 2009
@@ -1,12 +1,11 @@
package org.apache.activemq.broker.openwire;
-import static org.apache.activemq.broker.openwire.OpenWireSupport.createConnectionInfo;
-import static org.apache.activemq.broker.openwire.OpenWireSupport.createMessage;
-import static org.apache.activemq.broker.openwire.OpenWireSupport.createProducerInfo;
-import static org.apache.activemq.broker.openwire.OpenWireSupport.createSessionInfo;
+import static org.apache.activemq.broker.openwire.Openwire2Support.createConnectionInfo;
+import static org.apache.activemq.broker.openwire.Openwire2Support.createMessage;
+import static org.apache.activemq.broker.openwire.Openwire2Support.createProducerInfo;
+import static org.apache.activemq.broker.openwire.Openwire2Support.createSessionInfo;
import java.net.URI;
-import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
@@ -15,15 +14,17 @@
import org.apache.activemq.broker.Destination;
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.broker.Router;
-import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
import org.apache.activemq.flow.Flow;
@@ -35,7 +36,6 @@
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.MetricCounter;
import org.apache.activemq.queue.SingleFlowRelay;
-import org.apache.activemq.transport.DispatchableTransport;
import org.apache.activemq.transport.TransportFactory;
public class RemoteProducer extends Connection implements Dispatchable, FlowUnblockListener<MessageDelivery> {
@@ -61,8 +61,9 @@
private ActiveMQDestination activemqDestination;
private WindowLimiter<MessageDelivery> outboundLimiter;
-
private IFlowController<MessageDelivery> outboundController;
+
+ private SingleFlowRelay<MessageDelivery> outboundQueue;
public void start() throws Exception {
@@ -77,18 +78,9 @@
rate.name("Producer " + name + " Rate");
totalProducerRate.add(rate);
- transport = TransportFactory.compositeConnect(uri);
- transport.setTransportListener(this);
- if(transport instanceof DispatchableTransport)
- {
- DispatchableTransport dt = ((DispatchableTransport)transport);
- dt.setName(name + "-client-transport");
- dt.setDispatcher(getDispatcher());
- }
- super.setTransport(transport);
-
- super.initialize();
- transport.start();
+ initialize();
+ transport = TransportFactory.connect(uri);
+ super.start();
if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
activemqDestination = new ActiveMQQueue(destination.getName().toString());
@@ -111,26 +103,15 @@
protected void initialize() {
Flow ouboundFlow = new Flow(name, false);
outboundLimiter = new WindowLimiter<MessageDelivery>(true, ouboundFlow, outputWindowSize, outputResumeThreshold);
- outputQueue = new SingleFlowRelay<MessageDelivery>(ouboundFlow, name + "-outbound", outboundLimiter);
- outboundController = outputQueue.getFlowController(ouboundFlow);
-
- if (transport instanceof DispatchableTransport) {
- outputQueue.setDrain(new IFlowDrain<MessageDelivery>() {
-
- public void drain(MessageDelivery message, ISourceController<MessageDelivery> controller) {
- write(message);
- }
- });
-
- } else {
- blockingTransport = true;
- blockingWriter = Executors.newSingleThreadExecutor();
- outputQueue.setDrain(new IFlowDrain<MessageDelivery>() {
- public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
- write(message);
- };
- });
- }
+ outboundQueue = new SingleFlowRelay<MessageDelivery>(ouboundFlow, name + "-outbound", outboundLimiter);
+
+ outboundController = outboundQueue.getFlowController(ouboundFlow);
+ outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+ public void drain(MessageDelivery message, ISourceController<MessageDelivery> controller) {
+ Message msg = message.asType(Message.class);
+ write(msg);
+ }
+ });
}
public void stop() throws Exception
@@ -142,9 +123,12 @@
public void onCommand(Object command) {
try {
- if (command.getClass() == ProducerAck.class) {
+ if (command.getClass() == WireFormatInfo.class) {
+ } else if (command.getClass() == BrokerInfo.class) {
+ System.out.println("Producer "+name+" connected to "+((BrokerInfo)command).getBrokerName());
+ } else if (command.getClass() == ProducerAck.class) {
ProducerAck fc = (ProducerAck) command;
- synchronized (outputQueue) {
+ synchronized (outboundQueue) {
outboundLimiter.onProtocolCredit(fc.getSize());
}
} else {
@@ -190,7 +174,7 @@
}
}
- getSink().add(next, null);
+ outboundQueue.add(next, null);
rate.increment();
next = null;
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java?rev=752995&r1=752994&r2=752995&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java Thu Mar 12 20:18:25 2009
@@ -11,7 +11,6 @@
import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
import org.apache.activemq.flow.Commands.Message.MessageBean;
-import org.apache.activemq.flow.Commands.Message.MessageBuffer;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;