You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/13 17:18:00 UTC
svn commit: r753311 - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/openwire/
main/java/org/apache/activemq/wireformat/ test/java/org/apache/activemq/br...
Author: chirino
Date: Fri Mar 13 16:17:58 2009
New Revision: 753311
URL: http://svn.apache.org/viewvc?rev=753311&view=rev
Log:
- Added the abiliity to discriminate the protocol used by a connection.
- The BrokerConnection now uses a ProtocolHandler once the protocol is discriminated.
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/multi
Removed:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=753311&r1=753310&r2=753311&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Fri Mar 13 16:17:58 2009
@@ -22,11 +22,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.flow.Flow;
-import org.apache.activemq.flow.IFlowLimiter;
-import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.transport.DispatchableTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
@@ -77,7 +73,7 @@
protected void initialize() {
}
- protected final void write(final Object o) {
+ public final void write(final Object o) {
if (blockingWriter==null) {
try {
transport.oneway(o);
@@ -158,57 +154,8 @@
return inputResumeThreshold;
}
- protected interface ProtocolLimiter<E> extends IFlowLimiter<E> {
- public void onProtocolCredit(int credit);
- }
-
- protected class WindowLimiter<E> extends SizeLimiter<E> implements ProtocolLimiter<E> {
- final Flow flow;
- final boolean clientMode;
- private int available;
-
- public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) {
- super(capacity, resumeThreshold);
- this.clientMode = clientMode;
- this.flow = flow;
- }
-
- public void reserve(E elem) {
- super.reserve(elem);
-// if (!clientMode) {
-// System.out.println(name + " Reserved " + this);
-// }
- }
-
- public void releaseReserved(E elem) {
- super.reserve(elem);
-// if (!clientMode) {
-// System.out.println(name + " Released Reserved " + this);
-// }
- }
-
- protected void remove(int size) {
- super.remove(size);
- if (!clientMode) {
- available += size;
- if (available >= capacity - resumeThreshold) {
- sendCredit(available);
- available = 0;
- }
- }
- }
-
- protected void sendCredit(int credit) {
- throw new UnsupportedOperationException("Please override this method to provide and implemenation.");
- }
-
- public void onProtocolCredit(int credit) {
- remove(credit);
- }
-
- public int getElementSize(MessageDelivery m) {
- return m.getFlowLimiterSize();
- }
+ public Transport getTransport() {
+ return transport;
}
}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java Fri Mar 13 16:17:58 2009
@@ -0,0 +1,57 @@
+/**
+ *
+ */
+package org.apache.activemq;
+
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.SizeLimiter;
+
+public class WindowLimiter<E> extends SizeLimiter<E> {
+ final Flow flow;
+ final boolean clientMode;
+ private int available;
+
+ public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) {
+ super(capacity, resumeThreshold);
+ this.clientMode = clientMode;
+ this.flow = flow;
+ }
+
+ public void reserve(E elem) {
+ super.reserve(elem);
+// if (!clientMode) {
+// System.out.println(name + " Reserved " + this);
+// }
+ }
+
+ public void releaseReserved(E elem) {
+ super.reserve(elem);
+// if (!clientMode) {
+// System.out.println(name + " Released Reserved " + this);
+// }
+ }
+
+ protected void remove(int size) {
+ super.remove(size);
+ if (!clientMode) {
+ available += size;
+ if (available >= capacity - resumeThreshold) {
+ sendCredit(available);
+ available = 0;
+ }
+ }
+ }
+
+ protected void sendCredit(int credit) {
+ throw new UnsupportedOperationException("Please override this method to provide and implemenation.");
+ }
+
+ public void onProtocolCredit(int credit) {
+ remove(credit);
+ }
+
+ public int getElementSize(MessageDelivery m) {
+ return m.getFlowLimiterSize();
+ }
+ }
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java?rev=753311&r1=753310&r2=753311&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java Fri Mar 13 16:17:58 2009
@@ -22,7 +22,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Connection;
-import org.apache.activemq.broker.openwire.OpenwireBrokerConnection;
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.transport.DispatchableTransportServer;
import org.apache.activemq.transport.Transport;
@@ -41,7 +40,8 @@
final HashMap<Destination, Queue> queues = new HashMap<Destination, Queue>();
private TransportServer transportServer;
- private String uri;
+ private String bindUri;
+ private String connectUri;
private String name;
private IDispatcher dispatcher;
private final AtomicBoolean stopping = new AtomicBoolean();
@@ -72,7 +72,7 @@
public final void start() throws Exception {
dispatcher.start();
- transportServer = TransportFactory.bind(new URI(uri));
+ transportServer = TransportFactory.bind(new URI(bindUri));
transportServer.setAcceptListener(this);
if (transportServer instanceof DispatchableTransportServer) {
((DispatchableTransportServer) transportServer).setDispatcher(dispatcher);
@@ -85,7 +85,7 @@
}
public void onAccept(final Transport transport) {
- OpenwireBrokerConnection connection = new OpenwireBrokerConnection();
+ BrokerConnection connection = new BrokerConnection();
connection.setBroker(this);
connection.setTransport(transport);
connection.setPriorityLevels(MAX_PRIORITY);
@@ -115,12 +115,12 @@
this.dispatcher = dispatcher;
}
- public String getUri() {
- return uri;
+ public String getBindUri() {
+ return bindUri;
}
- public void setUri(String uri) {
- this.uri = uri;
+ public void setBindUri(String uri) {
+ this.bindUri = uri;
}
public boolean isStopping() {
@@ -131,4 +131,14 @@
return router;
}
+
+ public String getConnectUri() {
+ return connectUri;
+ }
+
+
+ public void setConnectUri(String connectUri) {
+ this.connectUri = connectUri;
+ }
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java?rev=753311&r1=753310&r2=753311&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java Fri Mar 13 16:17:58 2009
@@ -17,10 +17,22 @@
package org.apache.activemq.broker;
import org.apache.activemq.Connection;
+import org.apache.activemq.Service;
+import org.apache.activemq.broker.openwire.OpenwireProtocolHandler;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.wireformat.WireFormat;
-abstract public class BrokerConnection extends Connection {
+public class BrokerConnection extends Connection {
protected Broker broker;
+ private ProtocolHandler protocolHandler;
+
+ public interface ProtocolHandler extends Service {
+ public void setConnection(BrokerConnection connection);
+ public void onCommand(Object command);
+ public void onException(Exception error);
+ public void setWireFormat(WireFormat wf);
+ }
public Broker getBroker() {
return broker;
@@ -30,10 +42,52 @@
this.broker = broker;
}
+
@Override
public boolean isStopping() {
return super.isStopping() || broker.isStopping();
}
+ public void onCommand(Object command) {
+ if( protocolHandler!=null ) {
+ protocolHandler.onCommand(command);
+ } else {
+ try {
+ WireFormat wf = (WireFormat) command;
+ if( wf.getClass() == OpenWireFormat.class ) {
+ protocolHandler = new OpenwireProtocolHandler();
+ protocolHandler.setConnection(this);
+ protocolHandler.setWireFormat(wf);
+ protocolHandler.start();
+ }
+ } catch (Exception e) {
+ onException(e);
+ }
+ }
+ }
+
+ @Override
+ public void onException(Exception error) {
+ if( protocolHandler!=null ) {
+ protocolHandler.onException(error);
+ } else {
+ error.printStackTrace();
+ try {
+ stop();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ super.stop();
+ if( protocolHandler!=null ) {
+ try {
+ protocolHandler.stop();
+ } catch (Exception ignore) {
+ }
+ }
+ }
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=753311&r1=753310&r2=753311&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Fri Mar 13 16:17:58 2009
@@ -34,7 +34,7 @@
public Destination getDestination() {
if( destination == null ) {
- destination = OpenwireBrokerConnection.convert(message.getDestination());
+ destination = OpenwireProtocolHandler.convert(message.getDestination());
}
return destination;
}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri Mar 13 16:17:58 2009
@@ -0,0 +1,565 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.openwire;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
+import org.apache.activemq.WindowLimiter;
+import org.apache.activemq.broker.BrokerConnection;
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.broker.BrokerConnection.ProtocolHandler;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ControlCommand;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.FlushCommand;
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.LogicExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NoLocalExpression;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowController;
+import org.apache.activemq.flow.IFlowDrain;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.transport.WireFormatNegotiator;
+import org.apache.activemq.wireformat.WireFormat;
+
+public class OpenwireProtocolHandler implements ProtocolHandler {
+
+ protected final HashMap<ProducerId, ProducerContext> producers = new HashMap<ProducerId, ProducerContext>();
+ protected final HashMap<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId, ConsumerContext>();
+
+ protected final Object inboundMutex = new Object();
+ protected IFlowController<MessageDelivery> inboundController;
+
+ protected BrokerConnection connection;
+ private OpenWireFormat wireFormat;
+
+ public void start() throws Exception {
+ // Setup the inbound processing..
+ final Flow flow = new Flow("broker-"+connection.getName()+"-inbound", false);
+ SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
+ inboundController = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
+ public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+ route(controller, elem);
+ }
+
+ public String toString() {
+ return flow.getFlowName();
+ }
+ }, flow, limiter, inboundMutex);
+ }
+
+ public void stop() throws Exception {
+ }
+
+ public void onCommand(Object o) {
+
+ final Command command = (Command) o;
+ boolean responseRequired = command.isResponseRequired();
+ try {
+ command.visit(new CommandVisitor() {
+
+ // /////////////////////////////////////////////////////////////////
+ // Methods that keep track of the client state
+ // /////////////////////////////////////////////////////////////////
+ public Response processAddConnection(ConnectionInfo info) throws Exception {
+ return ack(command);
+ }
+
+ public Response processAddSession(SessionInfo info) throws Exception {
+ return ack(command);
+ }
+
+ public Response processAddProducer(ProducerInfo info) throws Exception {
+ producers.put(info.getProducerId(), new ProducerContext(info));
+ return ack(command);
+ }
+
+ public Response processAddConsumer(ConsumerInfo info) throws Exception {
+ ConsumerContext ctx = new ConsumerContext(info);
+ consumers.put(info.getConsumerId(), ctx);
+ connection.getBroker().getRouter().bind(convert(info.getDestination()), ctx);
+ return ack(command);
+ }
+
+ public Response processRemoveConnection(ConnectionId info, long arg1) throws Exception {
+ return ack(command);
+ }
+
+ public Response processRemoveSession(SessionId info, long arg1) throws Exception {
+ return ack(command);
+ }
+
+ public Response processRemoveProducer(ProducerId info) throws Exception {
+ producers.remove(info);
+ return ack(command);
+ }
+
+ public Response processRemoveConsumer(ConsumerId info, long arg1) throws Exception {
+ return ack(command);
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Message Processing Methods.
+ // /////////////////////////////////////////////////////////////////
+ public Response processMessage(Message info) throws Exception {
+ ProducerId producerId = info.getProducerId();
+ ProducerContext producerContext = producers.get(producerId);
+
+ OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
+
+ // Only producers that are not using a window will block,
+ // and if it blocks.
+ // yes we block the connection's read thread. yes other
+ // sessions will not get
+ // serviced while we block here. The producer is depending
+ // on TCP flow
+ // control to slow him down so we have to stop ready from
+ // the socket at this
+ // point.
+ while (!producerContext.controller.offer(md, null)) {
+ producerContext.controller.waitForFlowUnblock();
+ }
+ return null;
+ }
+
+ public Response processMessageAck(MessageAck info) throws Exception {
+ ConsumerContext ctx = consumers.get(info.getConsumerId());
+ ctx.ack(info);
+ return ack(command);
+ }
+
+ // Only used when client prefetch is set to zero.
+ public Response processMessagePull(MessagePull info) throws Exception {
+ return ack(command);
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Control Methods
+ // /////////////////////////////////////////////////////////////////
+ public Response processWireFormat(WireFormatInfo info) throws Exception {
+
+ // Negotiate the openwire encoding options.
+ WireFormatNegotiator wfn = new WireFormatNegotiator(connection.getTransport(), wireFormat, 1);
+ wfn.sendWireFormat();
+ wfn.negociate(info);
+
+ // Now that the encoding is negotiated.. let the client know the details about this
+ // broker.
+ BrokerInfo brokerInfo = new BrokerInfo();
+ brokerInfo.setBrokerId(new BrokerId(connection.getBroker().getName()));
+ brokerInfo.setBrokerName(connection.getBroker().getName());
+ brokerInfo.setBrokerURL(connection.getBroker().getBindUri());
+ connection.write(brokerInfo);
+ return ack(command);
+ }
+
+ public Response processShutdown(ShutdownInfo info) throws Exception {
+ return ack(command);
+ }
+
+ public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+ return ack(command);
+ }
+
+ public Response processFlush(FlushCommand info) throws Exception {
+ return ack(command);
+ }
+
+ public Response processConnectionControl(ConnectionControl info) throws Exception {
+ return ack(command);
+ }
+
+ public Response processConnectionError(ConnectionError info) throws Exception {
+ return ack(command);
+ }
+
+ public Response processConsumerControl(ConsumerControl info) throws Exception {
+ return ack(command);
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Methods for server management
+ // /////////////////////////////////////////////////////////////////
+ public Response processAddDestination(DestinationInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ public Response processRemoveDestination(DestinationInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ public Response processControlCommand(ControlCommand info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Methods for transaction management
+ // /////////////////////////////////////////////////////////////////
+ public Response processBeginTransaction(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ public Response processEndTransaction(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ public Response processForgetTransaction(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Methods for cluster operations
+ // These commands are sent to the broker when it's acting like a
+ // client to another broker.
+ // /////////////////////////////////////////////////////////////////
+ public Response processBrokerInfo(BrokerInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ public Response processMessageDispatch(MessageDispatch info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ public Response processProducerAck(ProducerAck info) throws Exception {
+ return ack(command);
+ }
+
+ });
+ } catch (Exception e) {
+ if (responseRequired) {
+ ExceptionResponse response = new ExceptionResponse(e);
+ response.setCorrelationId(command.getCommandId());
+ connection.write(response);
+ } else {
+ connection.onException(e);
+ }
+
+ }
+ }
+
+ public void onException(Exception error) {
+ if( !connection.isStopping() ) {
+ error.printStackTrace();
+ new Thread(){
+ @Override
+ public void run() {
+ try {
+ connection.stop();
+ } catch (Exception ignore) {
+ }
+ }
+ }.start();
+ }
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Internal Support Methods
+ // /////////////////////////////////////////////////////////////////
+
+ private Response ack(Command command) {
+ if (command.isResponseRequired()) {
+ Response rc = new Response();
+ rc.setCorrelationId(command.getCommandId());
+ connection.write(rc);
+ }
+ return null;
+ }
+
+ static class FlowControllableAdapter implements FlowControllable<MessageDelivery> {
+ public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+ }
+
+ public IFlowSink<MessageDelivery> getFlowSink() {
+ return null;
+ }
+
+ public IFlowSource<MessageDelivery> getFlowSource() {
+ return null;
+ }
+ }
+
+ class ProducerContext {
+
+ private IFlowController<MessageDelivery> controller;
+ private String name;
+
+ public ProducerContext(final ProducerInfo info) {
+ this.name = info.getProducerId().toString();
+
+ // Openwire only uses credit windows at the producer level for
+ // producers that request the feature.
+ if (info.getWindowSize() > 0) {
+ final Flow flow = new Flow("broker-"+name+"-inbound", false);
+ WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize() / 2) {
+ @Override
+ protected void sendCredit(int credit) {
+ ProducerAck ack = new ProducerAck(info.getProducerId(), credit);
+ connection.write(ack);
+ }
+ };
+
+ controller = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
+ public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+ route(controller, elem);
+ }
+
+ public String toString() {
+ return flow.getFlowName();
+ }
+ }, flow, limiter, inboundMutex);
+ } else {
+ controller = inboundController;
+ }
+ }
+ }
+
+ class ConsumerContext implements DeliveryTarget {
+
+ private final ConsumerInfo info;
+ private String name;
+ private BooleanExpression selector;
+
+ private SingleFlowRelay<MessageDelivery> queue;
+ public WindowLimiter<MessageDelivery> limiter;
+
+ public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException {
+ this.info = info;
+ this.name = info.getConsumerId().toString();
+ selector = parseSelector(info);
+
+ Flow flow = new Flow("broker-"+name+"-outbound", false);
+ limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize()/2) {
+ public int getElementSize(MessageDelivery m) {
+ return 1;
+ }
+ };
+ queue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), limiter);
+ queue.setDrain(new IFlowDrain<MessageDelivery>() {
+ public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
+ Message msg = message.asType(Message.class);
+ MessageDispatch md = new MessageDispatch();
+ md.setConsumerId(info.getConsumerId());
+ md.setMessage(msg);
+ md.setDestination(msg.getDestination());
+ connection.write(md);
+ };
+ });
+ }
+
+ public void ack(MessageAck info) {
+ synchronized(queue) {
+ limiter.onProtocolCredit(info.getMessageCount());
+ }
+ }
+
+ public IFlowSink<MessageDelivery> getSink() {
+ return queue;
+ }
+
+ public boolean match(MessageDelivery message) {
+ Message msg = message.asType(Message.class);
+ if (msg == null) {
+ return false;
+ }
+
+ MessageEvaluationContext selectorContext = new MessageEvaluationContext();
+ selectorContext.setMessageReference(msg);
+ selectorContext.setDestination(info.getDestination());
+ try {
+ return (selector == null || selector.matches(selectorContext));
+ } catch (JMSException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ }
+
+ protected void route(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+ // TODO:
+ // Consider doing some caching of this target list. Most producers
+ // always send to
+ // the same destination.
+ Collection<DeliveryTarget> targets = connection.getBroker().getRouter().route(elem);
+
+ final Message message = ((OpenWireMessageDelivery) elem).getMessage();
+ if (targets != null) {
+
+ if (message.isResponseRequired()) {
+ // We need to ack the message once we ensure we won't loose it.
+ // We know we won't loose it once it's persisted or delivered to
+ // a consumer
+ // Setup a callback to get notifed once one of those happens.
+ if (message.isPersistent()) {
+ elem.setCompletionCallback(new Runnable() {
+ public void run() {
+ ack(message);
+ }
+ });
+ } else {
+ // Let the client know the broker got the message.
+ ack(message);
+ }
+ }
+
+ // Deliver the message to all the targets..
+ for (DeliveryTarget dt : targets) {
+ if (dt.match(elem)) {
+ dt.getSink().add(elem, controller);
+ }
+ }
+
+ } else {
+ // Let the client know we got the message even though there
+ // were no valid targets to deliver the message to.
+ if (message.isResponseRequired()) {
+ ack(message);
+ }
+ }
+ controller.elementDispatched(elem);
+ }
+
+ static public Destination convert(ActiveMQDestination dest) {
+ if (dest.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations();
+ ArrayList<Destination> d = new ArrayList<Destination>();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ d.add(convert(compositeDestinations[i]));
+ }
+ return new Destination.MultiDestination(d);
+ }
+ AsciiBuffer domain;
+ if (dest.isQueue()) {
+ domain = Router.QUEUE_DOMAIN;
+ }
+ if (dest.isTopic()) {
+ domain = Router.TOPIC_DOMAIN;
+ } else {
+ throw new IllegalArgumentException("Unsupported domain type: " + dest);
+ }
+ return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
+ }
+
+ private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
+ BooleanExpression rc = null;
+ if (info.getSelector() != null) {
+ rc = SelectorParser.parse(info.getSelector());
+ }
+ if (info.isNoLocal()) {
+ if (rc == null) {
+ rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
+ } else {
+ rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
+ }
+ }
+ if (info.getAdditionalPredicate() != null) {
+ if (rc == null) {
+ rc = info.getAdditionalPredicate();
+ } else {
+ rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
+ }
+ }
+ return rc;
+ }
+
+ public BrokerConnection getConnection() {
+ return connection;
+ }
+
+ public void setConnection(BrokerConnection connection) {
+ this.connection = connection;
+ }
+
+ public void setWireFormat(WireFormat wireFormat) {
+ this.wireFormat = (OpenWireFormat) wireFormat;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java Fri Mar 13 16:17:58 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class ConcatInputStream extends InputStream {
+
+ private InputStream first;
+ private final InputStream second;
+
+ public ConcatInputStream(InputStream first, InputStream second) {
+ this.first = first;
+ this.second = second;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if( first!=null ) {
+ int rc = first.read();
+ if( rc >= 0 ) {
+ return rc;
+ }
+ first = null;
+ }
+ return second.read();
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java Fri Mar 13 16:17:58 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.util.ByteSequence;
+
+public class DiscriminatableOpenWireFormatFactory extends OpenWireFormatFactory implements DiscriminatableWireFormatFactory {
+
+ private static final byte MAGIC[] = new byte[] {1, 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q'};
+
+ public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+ if( byteSequence.length == 4+MAGIC.length ) {
+ for( int i=0; i < MAGIC.length; i++ ) {
+ if( byteSequence.data[i+4] != MAGIC[i] ) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public int maxWireformatHeaderLength() {
+ return 4+MAGIC.length;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java Fri Mar 13 16:17:58 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+import org.apache.activemq.transport.stomp.StompWireFormatFactory;
+import org.apache.activemq.util.ByteSequence;
+
+public class DiscriminatableStompWireFormatFactory extends StompWireFormatFactory implements DiscriminatableWireFormatFactory {
+
+ public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+ return false;
+ }
+
+ public int maxWireformatHeaderLength() {
+ return 100;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java Fri Mar 13 16:17:58 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+import org.apache.activemq.util.ByteSequence;
+
+/**
+ * This should actually get merged into the WireFormatFactory class. But to avoid change to much in the core right,
+ * now it's an additional interface.
+ *
+ */
+public interface DiscriminatableWireFormatFactory extends WireFormatFactory {
+
+ int maxWireformatHeaderLength();
+
+ boolean matchesWireformatHeader(ByteSequence byteSequence);
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Fri Mar 13 16:17:58 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+
+public class MultiWireFormatFactory implements WireFormatFactory{
+
+ static class MultiWireFormat implements WireFormat {
+
+ ArrayList<DiscriminatableWireFormatFactory> wireFormatFactories = new ArrayList<DiscriminatableWireFormatFactory>();
+ WireFormat wireFormat;
+ int maxHeaderLength;
+
+ public int getVersion() {
+ return 0;
+ }
+ public boolean inReceive() {
+ return wireFormat.inReceive();
+ }
+ public void setVersion(int version) {
+ wireFormat.setVersion(version);
+ }
+
+ private ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ private ByteArrayInputStream peeked;
+
+ public Object unmarshal(DataInput in) throws IOException {
+
+ while( wireFormat == null ) {
+
+ int readByte = ((InputStream)in).read();
+ if( readByte < 0 ) {
+ throw new EOFException();
+ }
+ baos.write(readByte);
+
+ // Try to discriminate what we have read so far.
+ for (DiscriminatableWireFormatFactory wff : wireFormatFactories) {
+ if( wff.matchesWireformatHeader(baos.toByteSequence()) ) {
+ wireFormat = wff.createWireFormat();
+ peeked = new ByteArrayInputStream(baos.toByteSequence());
+ return wireFormat;
+ }
+ }
+
+ if( baos.size() >= maxHeaderLength ) {
+ throw new IOException("Could not discriminate the protocol.");
+ }
+ }
+
+ // If we have some peeked data we need to feed that back.. Only happens
+ // for the first few bytes of the protocol header.
+ if( peeked!=null ) {
+ in = new DataInputStream( new ConcatInputStream(peeked, (InputStream)in) );
+ Object rc = wireFormat.unmarshal(in);
+ if( peeked.available() <= 0 ) {
+ peeked=null;
+ }
+ return rc;
+ }
+
+ return wireFormat.unmarshal(in);
+ }
+
+
+ public void marshal(Object command, DataOutput out) throws IOException {
+ wireFormat.marshal(command, out);
+ }
+
+ public ByteSequence marshal(Object command) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ public Object unmarshal(ByteSequence packet) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ public ArrayList<DiscriminatableWireFormatFactory> getWireFormatFactories() {
+ return wireFormatFactories;
+ }
+ public void setWireFormatFactories(ArrayList<DiscriminatableWireFormatFactory> wireFormatFactories) {
+ this.wireFormatFactories = wireFormatFactories;
+ maxHeaderLength=0;
+ for (DiscriminatableWireFormatFactory wff : wireFormatFactories) {
+ maxHeaderLength = Math.max( maxHeaderLength, wff.maxWireformatHeaderLength());
+ }
+ }
+ }
+
+ public WireFormat createWireFormat() {
+ MultiWireFormat rc = new MultiWireFormat();
+ ArrayList<DiscriminatableWireFormatFactory> wireFormatFactories = new ArrayList<DiscriminatableWireFormatFactory>();
+ wireFormatFactories.add(new DiscriminatableStompWireFormatFactory());
+ wireFormatFactories.add(new DiscriminatableOpenWireFormatFactory());
+ rc.setWireFormatFactories(wireFormatFactories);
+ return rc;
+ }
+
+}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java?rev=753311&r1=753310&r2=753311&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java Fri Mar 13 16:17:58 2009
@@ -59,8 +59,10 @@
// set to force marshalling even in the NON tcp case.
protected boolean forceMarshalling = false;
- protected String sendBrokerURI;
- protected String receiveBrokerURI;
+ protected String sendBrokerBindURI;
+ protected String receiveBrokerBindURI;
+ protected String sendBrokerConnectURI;
+ protected String receiveBrokerConnectURI;
// Set's the number of threads to use:
protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors();
@@ -101,16 +103,20 @@
dispatcher = createDispatcher();
dispatcher.start();
if (tcp) {
- sendBrokerURI = "tcp://localhost:10000";
- receiveBrokerURI = "tcp://localhost:20000";
+ sendBrokerBindURI = "tcp://localhost:10000?wireFormat=multi";
+ receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=multi";
+ sendBrokerConnectURI = "tcp://localhost:10000";
+ receiveBrokerConnectURI = "tcp://localhost:20000";
} else {
if (forceMarshalling) {
- sendBrokerURI = "pipe://SendBroker";
- receiveBrokerURI = "pipe://ReceiveBroker";
+ sendBrokerBindURI = "pipe://SendBroker";
+ receiveBrokerBindURI = "pipe://ReceiveBroker";
} else {
- sendBrokerURI = "pipe://SendBroker";
- receiveBrokerURI = "pipe://ReceiveBroker";
+ sendBrokerBindURI = "pipe://SendBroker";
+ receiveBrokerBindURI = "pipe://ReceiveBroker";
}
+ sendBrokerConnectURI = sendBrokerBindURI;
+ receiveBrokerConnectURI = receiveBrokerBindURI;
}
}
@@ -370,12 +376,12 @@
private void createConnections() throws IOException, URISyntaxException {
if (multibroker) {
- sendBroker = createBroker("SendBroker", sendBrokerURI);
- rcvBroker = createBroker("RcvBroker", receiveBrokerURI);
+ sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI);
+ rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI);
brokers.add(sendBroker);
brokers.add(rcvBroker);
} else {
- sendBroker = rcvBroker = createBroker("Broker", sendBrokerURI);
+ sendBroker = rcvBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI);
brokers.add(sendBroker);
}
@@ -425,7 +431,7 @@
}
}
};
- consumer.setUri(new URI(rcvBroker.getUri()));
+ consumer.setUri(new URI(rcvBroker.getConnectUri()));
consumer.setDestination(destination);
consumer.setName("consumer" + (i + 1));
consumer.setTotalConsumerRate(totalConsumerRate);
@@ -442,7 +448,7 @@
}
}
};
- producer.setUri(new URI(sendBroker.getUri()));
+ producer.setUri(new URI(sendBroker.getConnectUri()));
producer.setProducerId(id + 1);
producer.setName("producer" + (id + 1));
producer.setDestination(destination);
@@ -463,10 +469,11 @@
return queue;
}
- private Broker createBroker(String name, String uri) {
+ private Broker createBroker(String name, String bindURI, String connectUri) {
Broker broker = new Broker();
broker.setName(name);
- broker.setUri(uri);
+ broker.setBindUri(bindURI);
+ broker.setConnectUri(connectUri);
broker.setDispatcher(dispatcher);
return broker;
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=753311&r1=753310&r2=753311&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java Fri Mar 13 16:17:58 2009
@@ -8,6 +8,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.activemq.Connection;
+import org.apache.activemq.WindowLimiter;
import org.apache.activemq.broker.Destination;
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.broker.Router;
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java?rev=753311&r1=753310&r2=753311&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java Fri Mar 13 16:17:58 2009
@@ -11,6 +11,7 @@
import javax.jms.JMSException;
import org.apache.activemq.Connection;
+import org.apache.activemq.WindowLimiter;
import org.apache.activemq.broker.Destination;
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.broker.Router;
Added: activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/multi
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/multi?rev=753311&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/multi (added)
+++ activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/multi Fri Mar 13 16:17:58 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.wireformat.MultiWireFormatFactory