You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/11 04:18:08 UTC
svn commit: r783607 [1/2] - in /activemq/sandbox/activemq-flow:
activemq-all/src/test/java/org/apache/activemq/broker/openwire/
activemq-bio/src/main/java/org/apache/activemq/transport/tcp/
activemq-broker/src/main/java/org/apache/activemq/apollo/ acti...
Author: cmacnaug
Date: Thu Jun 11 02:18:07 2009
New Revision: 783607
URL: http://svn.apache.org/viewvc?rev=783607&view=rev
Log:
Cleaning up MultiWireFormatFactory a bit (So that BrokerConnection works without it).
Also some cleanup in OpenWireProtocolHandler to avoid broker exception on connection close
Removed:
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java
Modified:
activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java
activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-flow/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-stomp/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java
Modified: activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java Thu Jun 11 02:18:07 2009
@@ -15,6 +15,14 @@
protected RemoteConsumer createConsumer() {
return new OpenwireRemoteConsumer();
}
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.BrokerTestBase#getRemoteWireFormat()
+ */
+ @Override
+ protected String getRemoteWireFormat() {
+ return "openwire";
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java Thu Jun 11 02:18:07 2009
@@ -6,10 +6,10 @@
import java.io.IOException;
-import org.apache.activemq.WindowLimiter;
-import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.apollo.WindowLimiter;
+import org.apache.activemq.apollo.broker.MessageDelivery;
import org.apache.activemq.broker.RemoteConsumer;
-import org.apache.activemq.broker.Router;
+import org.apache.activemq.apollo.broker.Router;
import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
Modified: activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java Thu Jun 11 02:18:07 2009
@@ -9,10 +9,10 @@
import javax.jms.JMSException;
-import org.apache.activemq.WindowLimiter;
-import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.apollo.WindowLimiter;
+import org.apache.activemq.apollo.broker.MessageDelivery;
import org.apache.activemq.broker.RemoteProducer;
-import org.apache.activemq.broker.Router;
+import org.apache.activemq.apollo.broker.Router;
import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
Modified: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Thu Jun 11 02:18:07 2009
@@ -611,4 +611,9 @@
}
});
}
+
+ public WireFormat getWireformat()
+ {
+ return wireFormat;
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java Thu Jun 11 02:18:07 2009
@@ -52,6 +52,7 @@
public void start() throws Exception {
transport.setTransportListener(this);
+
if (transport instanceof DispatchableTransport) {
DispatchableTransport dt = ((DispatchableTransport) transport);
if (name != null) {
@@ -146,6 +147,10 @@
}
}
+ public void setStopping() {
+ stopping.set(true);
+ }
+
public boolean isStopping() {
return stopping.get();
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java Thu Jun 11 02:18:07 2009
@@ -22,7 +22,7 @@
import org.apache.activemq.apollo.broker.protocol.ProtocolHandler;
import org.apache.activemq.apollo.broker.protocol.ProtocolHandlerFactory;
import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.wireformat.MultiWireFormatFactory.WireFormatConnected;
+import org.apache.activemq.wireformat.WireFormat;
public class BrokerConnection extends Connection {
@@ -60,17 +60,15 @@
protocolHandler.onCommand(command);
} else {
try {
-
- WireFormatConnected wfconnected = (WireFormatConnected) command;
- String wfName = wfconnected.getWireFormatFactory().wireformatName();
+ WireFormat wireformat = transport.getWireformat();
try {
- protocolHandler = ProtocolHandlerFactory.createProtocolHandler(wfName);
+ protocolHandler = ProtocolHandlerFactory.createProtocolHandler(wireformat.getName());
} catch(Exception e) {
- throw IOExceptionSupport.create("No protocol handler available for: "+wfName, e);
+ throw IOExceptionSupport.create("No protocol handler available for: "+wireformat.getName(), e);
}
protocolHandler.setConnection(this);
- protocolHandler.setWireFormat(wfconnected.getWireFormat());
+ protocolHandler.setWireFormat(wireformat);
protocolHandler.start();
setExceptionListener(new ExceptionListener(){
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Thu Jun 11 02:18:07 2009
@@ -40,7 +40,7 @@
public abstract class BrokerTestBase extends TestCase {
- protected static final int PERFORMANCE_SAMPLES = 5;
+ protected static final int PERFORMANCE_SAMPLES = 3;
protected static final int IO_WORK_AMOUNT = 0;
protected static final int FANIN_COUNT = 10;
@@ -63,7 +63,7 @@
// Set to use tcp IO
protected boolean tcp = true;
// set to force marshalling even in the NON tcp case.
- protected boolean forceMarshalling = false;
+ protected boolean forceMarshalling = true;
protected String sendBrokerBindURI;
protected String receiveBrokerBindURI;
@@ -95,24 +95,37 @@
protected void setUp() throws Exception {
dispatcher = createDispatcher();
dispatcher.start();
+
+ String brokerWireFormat = getRemoteWireFormat();
+ if(getSupportedWireFormats() != null)
+ {
+ brokerWireFormat= "multi&wireFormat.wireFormats=" + getSupportedWireFormats();
+ }
+
if (tcp) {
- sendBrokerBindURI = "tcp://localhost:10000?wireFormat=multi";
- receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=multi";
- sendBrokerConnectURI = "tcp://localhost:10000";
- receiveBrokerConnectURI = "tcp://localhost:20000";
+ sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + brokerWireFormat;
+ receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=" + brokerWireFormat;
+ sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=" + getRemoteWireFormat();
+ receiveBrokerConnectURI = "tcp://localhost:20000" + getRemoteWireFormat();
} else {
+ sendBrokerConnectURI = "pipe://SendBroker";
+ receiveBrokerConnectURI = "pipe://ReceiveBroker";
if (forceMarshalling) {
- sendBrokerBindURI = "pipe://SendBroker";
- receiveBrokerBindURI = "pipe://ReceiveBroker";
+ sendBrokerBindURI = sendBrokerConnectURI + "?wireFormat=" + getRemoteWireFormat();
+ receiveBrokerBindURI = receiveBrokerConnectURI + "?wireFormat=" + getRemoteWireFormat();
} else {
- sendBrokerBindURI = "pipe://SendBroker";
- receiveBrokerBindURI = "pipe://ReceiveBroker";
+ sendBrokerBindURI = sendBrokerConnectURI;
+ receiveBrokerBindURI = receiveBrokerConnectURI;
}
- sendBrokerConnectURI = sendBrokerBindURI;
- receiveBrokerConnectURI = receiveBrokerBindURI;
}
}
+ protected String getSupportedWireFormats() {
+ return null;
+ }
+
+ protected abstract String getRemoteWireFormat();
+
protected IDispatcher createDispatcher() {
return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", MessageBroker.MAX_PRIORITY, asyncThreadPoolSize);
}
Modified: activemq/sandbox/activemq-flow/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-flow/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java Thu Jun 11 02:18:07 2009
@@ -26,7 +26,7 @@
public class SimpleLoadBalancer<D extends IDispatcher> implements ExecutionLoadBalancer<D> {
- private final boolean DEBUG = true;
+ private final boolean DEBUG = false;
//TODO: Added plumbing for periodic rebalancing which we should
//consider implementing
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Thu Jun 11 02:18:07 2009
@@ -17,7 +17,6 @@
package org.apache.activemq.broker.openwire;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@@ -98,227 +97,238 @@
private OpenWireFormat storeWireFormat;
private Router router;
private VirtualHost host;
+ private final CommandVisitor visitor;
public OpenwireProtocolHandler() {
setStoreWireFormat(new OpenWireFormat());
- }
+ visitor = new CommandVisitor() {
- public void start() throws Exception {
+ // /////////////////////////////////////////////////////////////////
+ // Methods that keep track of the client state
+ // /////////////////////////////////////////////////////////////////
+ public Response processAddConnection(ConnectionInfo info) throws Exception {
+ connection.setName(info.getClientId());
+ return ack(info);
+ }
- }
+ public Response processAddSession(SessionInfo info) throws Exception {
+ return ack(info);
+ }
- public void stop() throws Exception {
- }
+ public Response processAddProducer(ProducerInfo info) throws Exception {
+ producers.put(info.getProducerId(), new ProducerContext(info));
+ return ack(info);
+ }
- public void onCommand(Object o) {
+ public Response processAddConsumer(ConsumerInfo info) throws Exception {
+ ConsumerContext ctx = new ConsumerContext(info);
+ consumers.put(info.getConsumerId(), ctx);
+ return ack(info);
+ }
- final Command command = (Command) o;
- boolean responseRequired = command.isResponseRequired();
- try {
- command.visit(new CommandVisitor() {
+ public Response processRemoveConnection(ConnectionId info, long arg1) throws Exception {
+ return null;
+ }
- // /////////////////////////////////////////////////////////////////
- // Methods that keep track of the client state
- // /////////////////////////////////////////////////////////////////
- public Response processAddConnection(ConnectionInfo info) throws Exception {
- connection.setName(info.getClientId());
- return ack(command);
- }
+ public Response processRemoveSession(SessionId info, long arg1) throws Exception {
+ return null;
+ }
- public Response processAddSession(SessionInfo info) throws Exception {
- return ack(command);
- }
+ public Response processRemoveProducer(ProducerId info) throws Exception {
+ producers.remove(info);
+ return null;
+ }
- public Response processAddProducer(ProducerInfo info) throws Exception {
- producers.put(info.getProducerId(), new ProducerContext(info));
- return ack(command);
+ public Response processRemoveConsumer(ConsumerId info, long arg1) throws Exception {
+ ConsumerContext ctx = consumers.remove(info);
+ if (ctx == null) {
+
}
+ return null;
+ }
- public Response processAddConsumer(ConsumerInfo info) throws Exception {
- ConsumerContext ctx = new ConsumerContext(info);
- consumers.put(info.getConsumerId(), ctx);
- return ack(command);
- }
+ // /////////////////////////////////////////////////////////////////
+ // Message Processing Methods.
+ // /////////////////////////////////////////////////////////////////
+ public Response processMessage(Message info) throws Exception {
+ ProducerId producerId = info.getProducerId();
+ ProducerContext producerContext = producers.get(producerId);
- public Response processRemoveConnection(ConnectionId info, long arg1) throws Exception {
- return ack(command);
- }
+ OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
+ md.setStoreWireFormat(storeWireFormat);
+ md.setPersistListener(OpenwireProtocolHandler.this);
- public Response processRemoveSession(SessionId info, long arg1) throws Exception {
- return ack(command);
+ // Only producers that are not using a window will block,
+ // and if it blocks.
+ // yes we block the connection's read thread. yes other
+ // sessions will not get
+ // serviced while we block here. The producer is depending
+ // on TCP flow
+ // control to slow him down so we have to stop ready from
+ // the socket at this
+ // point.
+ while (!producerContext.controller.offer(md, null)) {
+ producerContext.controller.waitForFlowUnblock();
}
+ return null;
+ }
- public Response processRemoveProducer(ProducerId info) throws Exception {
- producers.remove(info);
- return ack(command);
- }
+ public Response processMessageAck(MessageAck info) throws Exception {
+ ConsumerContext ctx = consumers.get(info.getConsumerId());
+ ctx.ack(info);
+ return ack(info);
+ }
- public Response processRemoveConsumer(ConsumerId info, long arg1) throws Exception {
- return ack(command);
- }
+ // Only used when client prefetch is set to zero.
+ public Response processMessagePull(MessagePull info) throws Exception {
+ return ack(info);
+ }
- // /////////////////////////////////////////////////////////////////
- // Message Processing Methods.
- // /////////////////////////////////////////////////////////////////
- public Response processMessage(Message info) throws Exception {
- ProducerId producerId = info.getProducerId();
- ProducerContext producerContext = producers.get(producerId);
-
- OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
- md.setStoreWireFormat(storeWireFormat);
- md.setPersistListener(OpenwireProtocolHandler.this);
-
- // Only producers that are not using a window will block,
- // and if it blocks.
- // yes we block the connection's read thread. yes other
- // sessions will not get
- // serviced while we block here. The producer is depending
- // on TCP flow
- // control to slow him down so we have to stop ready from
- // the socket at this
- // point.
- while (!producerContext.controller.offer(md, null)) {
- producerContext.controller.waitForFlowUnblock();
- }
- return null;
- }
+ // /////////////////////////////////////////////////////////////////
+ // Control Methods
+ // /////////////////////////////////////////////////////////////////
+ public Response processWireFormat(WireFormatInfo info) throws Exception {
- public Response processMessageAck(MessageAck info) throws Exception {
- ConsumerContext ctx = consumers.get(info.getConsumerId());
- ctx.ack(info);
- return ack(command);
- }
+ // Negotiate the openwire encoding options.
+ WireFormatNegotiator wfn = new WireFormatNegotiator(connection.getTransport(), wireFormat, 1);
+ wfn.sendWireFormat();
+ wfn.negociate(info);
- // Only used when client prefetch is set to zero.
- public Response processMessagePull(MessagePull info) throws Exception {
- return ack(command);
- }
+ // Now that the encoding is negotiated.. let the client know
+ // the details about this
+ // broker.
+ BrokerInfo brokerInfo = new BrokerInfo();
+ brokerInfo.setBrokerId(new BrokerId(connection.getBroker().getName()));
+ brokerInfo.setBrokerName(connection.getBroker().getName());
+ brokerInfo.setBrokerURL(connection.getBroker().getBindUri());
+ connection.write(brokerInfo);
+ return ack(info);
+ }
- // /////////////////////////////////////////////////////////////////
- // Control Methods
- // /////////////////////////////////////////////////////////////////
- public Response processWireFormat(WireFormatInfo info) throws Exception {
-
- // Negotiate the openwire encoding options.
- WireFormatNegotiator wfn = new WireFormatNegotiator(connection.getTransport(), wireFormat, 1);
- wfn.sendWireFormat();
- wfn.negociate(info);
-
- // Now that the encoding is negotiated.. let the client know
- // the details about this
- // broker.
- BrokerInfo brokerInfo = new BrokerInfo();
- brokerInfo.setBrokerId(new BrokerId(connection.getBroker().getName()));
- brokerInfo.setBrokerName(connection.getBroker().getName());
- brokerInfo.setBrokerURL(connection.getBroker().getBindUri());
- connection.write(brokerInfo);
- return ack(command);
- }
+ public Response processShutdown(ShutdownInfo info) throws Exception {
+ connection.setStopping();
+ return ack(info);
+ }
- public Response processShutdown(ShutdownInfo info) throws Exception {
- return ack(command);
+ public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+ if (info.isResponseRequired()) {
+ info.setResponseRequired(false);
+ connection.write(info);
}
+ return null;
+ }
- public Response processKeepAlive(KeepAliveInfo info) throws Exception {
- if (info.isResponseRequired()) {
- info.setResponseRequired(false);
- connection.write(info);
- }
- return null;
- }
+ public Response processFlush(FlushCommand info) throws Exception {
+ return ack(info);
+ }
- public Response processFlush(FlushCommand info) throws Exception {
- return ack(command);
- }
+ public Response processConnectionControl(ConnectionControl info) throws Exception {
+ return ack(info);
+ }
- public Response processConnectionControl(ConnectionControl info) throws Exception {
- return ack(command);
- }
+ public Response processConnectionError(ConnectionError info) throws Exception {
+ return ack(info);
+ }
- public Response processConnectionError(ConnectionError info) throws Exception {
- return ack(command);
- }
+ public Response processConsumerControl(ConsumerControl info) throws Exception {
+ return ack(info);
+ }
- public Response processConsumerControl(ConsumerControl info) throws Exception {
- return ack(command);
- }
+ // /////////////////////////////////////////////////////////////////
+ // Methods for server management
+ // /////////////////////////////////////////////////////////////////
+ public Response processAddDestination(DestinationInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
- // /////////////////////////////////////////////////////////////////
- // Methods for server management
- // /////////////////////////////////////////////////////////////////
- public Response processAddDestination(DestinationInfo info) throws Exception {
- throw new UnsupportedOperationException();
- }
+ public Response processRemoveDestination(DestinationInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
- public Response processRemoveDestination(DestinationInfo info) throws Exception {
- throw new UnsupportedOperationException();
- }
+ public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
- public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
- throw new UnsupportedOperationException();
- }
+ public Response processControlCommand(ControlCommand info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
- public Response processControlCommand(ControlCommand info) throws Exception {
- throw new UnsupportedOperationException();
- }
+ // /////////////////////////////////////////////////////////////////
+ // Methods for transaction management
+ // /////////////////////////////////////////////////////////////////
+ public Response processBeginTransaction(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
- // /////////////////////////////////////////////////////////////////
- // Methods for transaction management
- // /////////////////////////////////////////////////////////////////
- public Response processBeginTransaction(TransactionInfo info) throws Exception {
- throw new UnsupportedOperationException();
- }
+ public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
- public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
- throw new UnsupportedOperationException();
- }
+ public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
- public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
- throw new UnsupportedOperationException();
- }
+ public Response processEndTransaction(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
- public Response processEndTransaction(TransactionInfo info) throws Exception {
- throw new UnsupportedOperationException();
- }
+ public Response processForgetTransaction(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
- public Response processForgetTransaction(TransactionInfo info) throws Exception {
- throw new UnsupportedOperationException();
- }
+ public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
- public Response processPrepareTransaction(TransactionInfo info) throws Exception {
- throw new UnsupportedOperationException();
- }
+ public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
- public Response processRecoverTransactions(TransactionInfo info) throws Exception {
- throw new UnsupportedOperationException();
- }
+ public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
- public Response processRollbackTransaction(TransactionInfo info) throws Exception {
- throw new UnsupportedOperationException();
- }
+ // /////////////////////////////////////////////////////////////////
+ // Methods for cluster operations
+ // These commands are sent to the broker when it's acting like a
+ // client to another broker.
+ // /////////////////////////////////////////////////////////////////
+ public Response processBrokerInfo(BrokerInfo info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
- // /////////////////////////////////////////////////////////////////
- // Methods for cluster operations
- // These commands are sent to the broker when it's acting like a
- // client to another broker.
- // /////////////////////////////////////////////////////////////////
- public Response processBrokerInfo(BrokerInfo info) throws Exception {
- throw new UnsupportedOperationException();
- }
+ public Response processMessageDispatch(MessageDispatch info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
- public Response processMessageDispatch(MessageDispatch info) throws Exception {
- throw new UnsupportedOperationException();
- }
+ public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception {
+ throw new UnsupportedOperationException();
+ }
- public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception {
- throw new UnsupportedOperationException();
- }
+ public Response processProducerAck(ProducerAck info) throws Exception {
+ return ack(info);
+ }
+ };
+ }
- public Response processProducerAck(ProducerAck info) throws Exception {
- return ack(command);
- }
+ public void start() throws Exception {
+
+ }
- });
+ public void stop() throws Exception {
+ }
+
+ public void onCommand(Object o) {
+
+ final Command command = (Command) o;
+ boolean responseRequired = command.isResponseRequired();
+ try {
+ Response response = command.visit(visitor);
+
+ if (responseRequired && response == null) {
+ ack(command);
+ }
+
} catch (Exception e) {
if (responseRequired) {
ExceptionResponse response = new ExceptionResponse(e);
@@ -439,7 +449,7 @@
controller.useOverFlowQueue(false);
controller.setExecutor(connection.getDispatcher().createPriorityExecutor(connection.getDispatcher().getDispatchPriorities() - 1));
super.onFlowOpened(controller);
-
+
BrokerSubscription sub = host.createSubscription(this);
sub.connect(this);
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Thu Jun 11 02:18:07 2009
@@ -44,6 +44,7 @@
public final class OpenWireFormat implements WireFormat {
public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_VERSION;
+ public static final String WIREFORMAT_NAME = "openwire";
static final byte NULL_TYPE = CommandTypes.NULL;
private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
@@ -113,6 +114,10 @@
public int getVersion() {
return version;
}
+
+ public String getName() {
+ return WIREFORMAT_NAME;
+ }
public synchronized ByteSequence marshal(Object command) throws IOException {
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.openwire;
import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
@@ -136,4 +137,25 @@
long maxInactivityDurationInitalDelay) {
this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
}
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormatFactory#isDiscriminatable()
+ */
+ public boolean isDiscriminatable() {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormatFactory#matchesWireformatHeader(org.apache.activemq.util.ByteSequence)
+ */
+ public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+ throw new UnsupportedOperationException();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormatFactory#maxWireformatHeaderLength()
+ */
+ public int maxWireformatHeaderLength() {
+ throw new UnsupportedOperationException();
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Jun 11 02:18:07 2009
@@ -48,6 +48,7 @@
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -92,13 +93,12 @@
private Exception connectionFailure;
private boolean firstConnection = true;
//optionally always have a backup created
- private boolean backup=false;
- private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>();
- private int backupPoolSize=1;
+ private boolean backup = false;
+ private List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
+ private int backupPoolSize = 1;
private boolean trackMessages = false;
private int maxCacheSize = 128 * 1024;
private TransportListener disposedListener = new DefaultTransportListener() {};
-
private final TransportListener myTransportListener = createTransportListener();
@@ -108,27 +108,27 @@
// Setup a task that is used to reconnect the a connection async.
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
public boolean iterate() {
- boolean result=false;
- boolean buildBackup=true;
- boolean doReconnect = !disposed;
- synchronized(backupMutex) {
- if (connectedTransport.get()==null && !disposed) {
- result=doReconnect();
- buildBackup=false;
- }
- }
- if(buildBackup) {
- buildBackups();
- }else {
- //build backups on the next iteration
- result=true;
- try {
+ boolean result = false;
+ boolean buildBackup = true;
+ boolean doReconnect = !disposed;
+ synchronized (backupMutex) {
+ if (connectedTransport.get() == null && !disposed) {
+ result = doReconnect();
+ buildBackup = false;
+ }
+ }
+ if (buildBackup) {
+ buildBackups();
+ } else {
+ //build backups on the next iteration
+ result = true;
+ try {
reconnectTask.wakeup();
} catch (InterruptedException e) {
LOG.debug("Reconnect task has been interrupted.", e);
}
- }
- return result;
+ }
+ return result;
}
}, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
@@ -137,22 +137,22 @@
TransportListener createTransportListener() {
return new TransportListener() {
public void onCommand(Object o) {
- Command command = (Command)o;
+ Command command = (Command) o;
if (command == null) {
return;
}
if (command.isResponse()) {
Object object = null;
- synchronized(requestMap) {
- object = requestMap.remove(Integer.valueOf(((Response)command).getCorrelationId()));
+ synchronized (requestMap) {
+ object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
}
if (object != null && object.getClass() == Tracked.class) {
- ((Tracked)object).onResponses();
+ ((Tracked) object).onResponses();
}
}
if (!initialized) {
if (command.isBrokerInfo()) {
- BrokerInfo info = (BrokerInfo)command;
+ BrokerInfo info = (BrokerInfo) command;
BrokerInfo[] peers = info.getPeerBrokerInfos();
if (peers != null) {
for (int i = 0; i < peers.length; i++) {
@@ -192,28 +192,27 @@
};
}
-
public final void handleTransportFailure(IOException e) throws InterruptedException {
-
+
Transport transport = connectedTransport.getAndSet(null);
- if( transport!=null ) {
-
+ if (transport != null) {
+
transport.setTransportListener(disposedListener);
ServiceSupport.dispose(transport);
-
+
synchronized (reconnectMutex) {
boolean reconnectOk = false;
- if(started) {
- LOG.warn("Transport failed to " + connectedTransportURI+ " , attempting to automatically reconnect due to: " + e);
+ if (started) {
+ LOG.warn("Transport failed to " + connectedTransportURI + " , attempting to automatically reconnect due to: " + e);
LOG.debug("Transport failed with the following exception:", e);
reconnectOk = true;
}
-
+
initialized = false;
- failedConnectTransportURI=connectedTransportURI;
+ failedConnectTransportURI = connectedTransportURI;
connectedTransportURI = null;
- connected=false;
- if(reconnectOk) {
+ connected = false;
+ if (reconnectOk) {
reconnectTask.wakeup();
}
}
@@ -243,7 +242,7 @@
}
public void stop() throws Exception {
- Transport transportToStop=null;
+ Transport transportToStop = null;
synchronized (reconnectMutex) {
LOG.debug("Stopped.");
if (!started) {
@@ -252,7 +251,7 @@
started = false;
disposed = true;
connected = false;
- for (BackupTransport t:backups) {
+ for (BackupTransport t : backups) {
t.setDisposed(true);
}
backups.clear();
@@ -266,7 +265,7 @@
sleepMutex.notifyAll();
}
reconnectTask.shutdown();
- if( transportToStop!=null ) {
+ if (transportToStop != null) {
transportToStop.stop();
}
}
@@ -320,14 +319,14 @@
}
public long getTimeout() {
- return timeout;
- }
+ return timeout;
+ }
- public void setTimeout(long timeout) {
- this.timeout = timeout;
- }
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
- /**
+ /**
* @return Returns the randomize.
*/
public boolean isRandomize() {
@@ -335,29 +334,30 @@
}
/**
- * @param randomize The randomize to set.
+ * @param randomize
+ * The randomize to set.
*/
public void setRandomize(boolean randomize) {
this.randomize = randomize;
}
-
+
public boolean isBackup() {
- return backup;
- }
+ return backup;
+ }
+
+ public void setBackup(boolean backup) {
+ this.backup = backup;
+ }
+
+ public int getBackupPoolSize() {
+ return backupPoolSize;
+ }
+
+ public void setBackupPoolSize(int backupPoolSize) {
+ this.backupPoolSize = backupPoolSize;
+ }
- public void setBackup(boolean backup) {
- this.backup = backup;
- }
-
- public int getBackupPoolSize() {
- return backupPoolSize;
- }
-
- public void setBackupPoolSize(int backupPoolSize) {
- this.backupPoolSize = backupPoolSize;
- }
-
- public boolean isTrackMessages() {
+ public boolean isTrackMessages() {
return trackMessages;
}
@@ -372,30 +372,29 @@
public void setMaxCacheSize(int maxCacheSize) {
this.maxCacheSize = maxCacheSize;
}
-
+
/**
- * @return Returns true if the command is one sent when a connection
- * is being closed.
+ * @return Returns true if the command is one sent when a connection is
+ * being closed.
*/
private boolean isShutdownCommand(Command command) {
- return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo));
+ return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo));
}
-
public void oneway(Object o) throws IOException {
-
- Command command = (Command)o;
+
+ Command command = (Command) o;
Exception error = null;
try {
synchronized (reconnectMutex) {
-
+
if (isShutdownCommand(command) && connectedTransport.get() == null) {
- if(command.isShutdownInfo()) {
+ if (command.isShutdownInfo()) {
// Skipping send of ShutdownInfo command when not connected.
return;
}
- if(command instanceof RemoveInfo) {
+ if (command instanceof RemoveInfo) {
// Simulate response to RemoveInfo command
stateTracker.track(command);
Response response = new Response();
@@ -412,15 +411,13 @@
Transport transport = connectedTransport.get();
long start = System.currentTimeMillis();
boolean timedout = false;
- while (transport == null && !disposed
- && connectionFailure == null
- && !Thread.currentThread().isInterrupted()) {
+ while (transport == null && !disposed && connectionFailure == null && !Thread.currentThread().isInterrupted()) {
LOG.trace("Waiting for transport to reconnect.");
long end = System.currentTimeMillis();
if (timeout > 0 && (end - start > timeout)) {
- timedout = true;
- LOG.info("Failover timed out after " + (end - start) + "ms");
- break;
+ timedout = true;
+ LOG.info("Failover timed out after " + (end - start) + "ms");
+ break;
}
try {
reconnectMutex.wait(100);
@@ -439,8 +436,8 @@
} else if (connectionFailure != null) {
error = connectionFailure;
} else if (timedout == true) {
- error = new IOException("Failover timeout of " + timeout + " ms reached.");
- }else {
+ error = new IOException("Failover timeout of " + timeout + " ms reached.");
+ } else {
error = new IOException("Unexpected failure.");
}
break;
@@ -451,7 +448,7 @@
// then hold it in the requestMap so that we can replay
// it later.
Tracked tracked = stateTracker.track(command);
- synchronized(requestMap) {
+ synchronized (requestMap) {
if (tracked != null && tracked.isWaitingForResponse()) {
requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
} else if (tracked == null && command.isResponseRequired()) {
@@ -488,7 +485,7 @@
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
+ LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
}
handleTransportFailure(e);
}
@@ -502,7 +499,7 @@
if (!disposed) {
if (error != null) {
if (error instanceof IOException) {
- throw (IOException)error;
+ throw (IOException) error;
}
throw IOExceptionSupport.create(error);
}
@@ -574,7 +571,7 @@
if (randomize) {
// Randomly, reorder the list by random swapping
for (int i = 0; i < l.size(); i++) {
- int p = (int) (Math.random()*100 % l.size());
+ int p = (int) (Math.random() * 100 % l.size());
URI t = l.get(p);
l.set(p, l.get(i));
l.set(i, t);
@@ -592,7 +589,7 @@
}
public void setTransportListener(TransportListener commandListener) {
- synchronized(listenerMutex) {
+ synchronized (listenerMutex) {
this.transportListener = commandListener;
listenerMutex.notifyAll();
}
@@ -604,7 +601,7 @@
return target.cast(this);
}
Transport transport = connectedTransport.get();
- if ( transport != null) {
+ if (transport != null) {
return transport.narrow(target);
}
return null;
@@ -619,7 +616,7 @@
t.oneway(cc);
stateTracker.restore(t);
Map tmpMap = null;
- synchronized(requestMap) {
+ synchronized (requestMap) {
tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
}
for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) {
@@ -645,7 +642,7 @@
public String getRemoteAddress() {
Transport transport = connectedTransport.get();
- if ( transport != null) {
+ if (transport != null) {
return transport.getRemoteAddress();
}
return null;
@@ -654,13 +651,13 @@
public boolean isFaultTolerant() {
return true;
}
-
+
public boolean isUseInactivityMonitor() {
//this is up to the underlying transport:
return false;
}
-
- final boolean doReconnect() {
+
+ final boolean doReconnect() {
Exception failure = null;
synchronized (reconnectMutex) {
@@ -678,32 +675,32 @@
if (!useExponentialBackOff) {
reconnectDelay = initialReconnectDelay;
}
- synchronized(backupMutex) {
+ synchronized (backupMutex) {
if (backup && !backups.isEmpty()) {
- BackupTransport bt = backups.remove(0);
+ BackupTransport bt = backups.remove(0);
Transport t = bt.getTransport();
URI uri = bt.getUri();
t.setTransportListener(myTransportListener);
try {
- if (started) {
- restoreTransport(t);
+ if (started) {
+ restoreTransport(t);
}
reconnectDelay = initialReconnectDelay;
- failedConnectTransportURI=null;
+ failedConnectTransportURI = null;
connectedTransportURI = uri;
connectedTransport.set(t);
reconnectMutex.notifyAll();
connectFailures = 0;
LOG.info("Successfully reconnected to backup " + uri);
return false;
- }catch (Exception e) {
- LOG.debug("Backup transport failed",e);
- }
+ } catch (Exception e) {
+ LOG.debug("Backup transport failed", e);
+ }
}
}
-
+
Iterator<URI> iter = connectList.iterator();
- while(iter.hasNext() && connectedTransport.get() == null && !disposed) {
+ while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
URI uri = iter.next();
Transport t = null;
try {
@@ -711,7 +708,7 @@
t = TransportFactory.compositeConnect(uri);
t.setTransportListener(myTransportListener);
t.start();
-
+
if (started) {
restoreTransport(t);
}
@@ -722,36 +719,37 @@
connectedTransport.set(t);
reconnectMutex.notifyAll();
connectFailures = 0;
- // Make sure on initial startup, that the transportListener
- // has been initialized for this instance.
- synchronized(listenerMutex) {
- if (transportListener==null) {
+ // Make sure on initial startup, that the transportListener
+ // has been initialized for this instance.
+ synchronized (listenerMutex) {
+ if (transportListener == null) {
try {
//if it isn't set after 2secs - it
//probably never will be
listenerMutex.wait(2000);
- }catch(InterruptedException ex) {}
+ } catch (InterruptedException ex) {
+ }
}
}
if (transportListener != null) {
transportListener.transportResumed();
- }else {
+ } else {
LOG.debug("transport resumed by transport listener not set");
}
if (firstConnection) {
- firstConnection=false;
+ firstConnection = false;
LOG.info("Successfully connected to " + uri);
- }else {
+ } else {
LOG.info("Successfully reconnected to " + uri);
}
- connected=true;
+ connected = true;
return false;
} catch (Exception e) {
failure = e;
LOG.debug("Connect fail to: " + uri + ", reason: " + e);
- if (t!=null) {
+ if (t != null) {
try {
- t.stop();
+ t.stop();
} catch (Exception ee) {
LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
}
@@ -760,29 +758,29 @@
}
}
}
-
+
if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) {
LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
connectionFailure = failure;
-
+
// Make sure on initial startup, that the transportListener has been initialized
// for this instance.
- synchronized(listenerMutex) {
- if (transportListener==null) {
+ synchronized (listenerMutex) {
+ if (transportListener == null) {
try {
listenerMutex.wait(2000);
- }catch(InterruptedException ex) {}
+ } catch (InterruptedException ex) {
+ }
}
}
-
- if(transportListener != null) {
+ if (transportListener != null) {
if (connectionFailure instanceof IOException) {
- transportListener.onException((IOException)connectionFailure);
+ transportListener.onException((IOException) connectionFailure);
} else {
- transportListener.onException(IOExceptionSupport.create(connectionFailure));
+ transportListener.onException(IOExceptionSupport.create(connectionFailure));
}
- }
+ }
reconnectMutex.notifyAll();
return false;
}
@@ -809,53 +807,65 @@
return !disposed;
}
-
- final boolean buildBackups() {
- synchronized (backupMutex) {
- if (!disposed && backup && backups.size() < backupPoolSize) {
- List<URI> connectList = getConnectList();
- //removed disposed backups
- List<BackupTransport>disposedList = new ArrayList<BackupTransport>();
- for (BackupTransport bt:backups) {
- if (bt.isDisposed()) {
- disposedList.add(bt);
- }
- }
- backups.removeAll(disposedList);
- disposedList.clear();
- for (Iterator<URI>iter = connectList.iterator();iter.hasNext() && backups.size() < backupPoolSize;) {
- URI uri = iter.next();
- if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
- try {
- BackupTransport bt = new BackupTransport(this);
- bt.setUri(uri);
- if (!backups.contains(bt)) {
- Transport t = TransportFactory.compositeConnect(uri);
- t.setTransportListener(bt);
- t.start();
- bt.setTransport(t);
- backups.add(bt);
- }
- }catch(Exception e) {
- LOG.debug("Failed to build backup ",e);
- }
- }
- }
- }
- }
- return false;
- }
+ final boolean buildBackups() {
+ synchronized (backupMutex) {
+ if (!disposed && backup && backups.size() < backupPoolSize) {
+ List<URI> connectList = getConnectList();
+ //removed disposed backups
+ List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
+ for (BackupTransport bt : backups) {
+ if (bt.isDisposed()) {
+ disposedList.add(bt);
+ }
+ }
+ backups.removeAll(disposedList);
+ disposedList.clear();
+ for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize;) {
+ URI uri = iter.next();
+ if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
+ try {
+ BackupTransport bt = new BackupTransport(this);
+ bt.setUri(uri);
+ if (!backups.contains(bt)) {
+ Transport t = TransportFactory.compositeConnect(uri);
+ t.setTransportListener(bt);
+ t.start();
+ bt.setTransport(t);
+ backups.add(bt);
+ }
+ } catch (Exception e) {
+ LOG.debug("Failed to build backup ", e);
+ }
+ }
+ }
+ }
+ }
+ return false;
+ }
public boolean isDisposed() {
- return disposed;
+ return disposed;
}
-
-
+
public boolean isConnected() {
return connected;
}
-
+
public void reconnect(URI uri) throws IOException {
- add(new URI[] {uri});
+ add(new URI[] { uri });
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.transport.Transport#getWireformat()
+ */
+ public WireFormat getWireformat() {
+ Transport connected = connectedTransport.get();
+ if(connected == null)
+ {
+ return null;
+ }
+ return connected.getWireformat();
}
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -19,14 +19,18 @@
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.util.ByteSequence;
-public class DiscriminatableOpenWireFormatFactory extends OpenWireFormatFactory implements DiscriminatableWireFormatFactory {
+public class DiscriminatableOpenWireFormatFactory extends OpenWireFormatFactory {
- private static final byte MAGIC[] = new byte[] {1, 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q'};
+ private static final byte MAGIC[] = new byte[] { 1, 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q' };
+
+ public boolean isDiscriminatable() {
+ return true;
+ }
public boolean matchesWireformatHeader(ByteSequence byteSequence) {
- if( byteSequence.length == 4+MAGIC.length ) {
- for( int i=0; i < MAGIC.length; i++ ) {
- if( byteSequence.data[i+4] != MAGIC[i] ) {
+ if (byteSequence.length == 4 + MAGIC.length) {
+ for (int i = 0; i < MAGIC.length; i++) {
+ if (byteSequence.data[i + 4] != MAGIC[i]) {
return false;
}
}
@@ -36,11 +40,6 @@
}
public int maxWireformatHeaderLength() {
- return 4+MAGIC.length;
+ return 4 + MAGIC.length;
}
-
- public String wireformatName() {
- return "openwire";
- }
-
}
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -23,7 +23,10 @@
public class Proto2WireFormatFactory implements WireFormatFactory {
+
public class TestWireFormat implements StatefulWireFormat {
+ public static final String WIREFORMAT_NAME = "proto";
+
private ByteBuffer currentOut;
private byte outType;
@@ -247,6 +250,14 @@
public int getVersion() {
return 0;
}
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormat#getName()
+ */
+ public String getName() {
+ return WIREFORMAT_NAME;
+ }
+
public void setVersion(int version) {
}
@@ -272,6 +283,26 @@
public WireFormat createWireFormat() {
return new TestWireFormat();
- }
+ }
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormatFactory#isDiscriminatable()
+ */
+ public boolean isDiscriminatable() {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormatFactory#matchesWireformatHeader(org.apache.activemq.util.ByteSequence)
+ */
+ public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+ throw new UnsupportedOperationException();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormatFactory#maxWireformatHeaderLength()
+ */
+ public int maxWireformatHeaderLength() {
+ throw new UnsupportedOperationException();
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -23,6 +23,8 @@
public class ProtoWireFormatFactory implements WireFormatFactory {
public class TestWireFormat implements StatefulWireFormat {
+ public static final String WIREFORMAT_NAME = "proto";
+
private ByteBuffer currentOut;
private byte outType;
@@ -240,10 +242,37 @@
public Transport createTransportFilters(Transport transport, Map options) {
return transport;
}
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormat#getName()
+ */
+ public String getName() {
+ return WIREFORMAT_NAME;
+ }
}
public WireFormat createWireFormat() {
return new TestWireFormat();
}
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormatFactory#isDiscriminatable()
+ */
+ public boolean isDiscriminatable() {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormatFactory#matchesWireformatHeader(org.apache.activemq.util.ByteSequence)
+ */
+ public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+ throw new UnsupportedOperationException();
+ }
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormatFactory#maxWireformatHeaderLength()
+ */
+ public int maxWireformatHeaderLength() {
+ throw new UnsupportedOperationException();
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -1,5 +1,6 @@
package org.apache.activemq.flow;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.ObjectStreamWireFormat;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
@@ -10,4 +11,24 @@
return new ObjectStreamWireFormat();
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormatFactory#isDiscriminatable()
+ */
+ public boolean isDiscriminatable() {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormatFactory#matchesWireformatHeader(org.apache.activemq.util.ByteSequence)
+ */
+ public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+ throw new UnsupportedOperationException();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormatFactory#maxWireformatHeaderLength()
+ */
+ public int maxWireformatHeaderLength() {
+ throw new UnsupportedOperationException();
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Thu Jun 11 02:18:07 2009
@@ -39,7 +39,7 @@
public class StompWireFormat implements WireFormat {
private static final byte[] NO_DATA = new byte[] {};
- private static final byte[] END_OF_FRAME = new byte[] {0, '\n'};
+ private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
private static final int MAX_COMMAND_LENGTH = 1024;
private static final int MAX_HEADER_LENGTH = 1024 * 10;
@@ -47,6 +47,7 @@
private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
private int version = 1;
+ public static final String WIREFORMAT_NAME = "stomp";
public ByteSequence marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -63,7 +64,7 @@
}
public void marshal(Object command, DataOutput os) throws IOException {
- StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame)command;
+ StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame) command;
StringBuffer buffer = new StringBuffer();
buffer.append(stomp.getAction());
@@ -71,7 +72,7 @@
// Output the headers.
for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
- Map.Entry entry = (Map.Entry)iter.next();
+ Map.Entry entry = (Map.Entry) iter.next();
buffer.append(entry.getKey());
buffer.append(Stomp.Headers.SEPERATOR);
buffer.append(entry.getValue());
@@ -200,14 +201,18 @@
return version;
}
+ public String getName() {
+ return WIREFORMAT_NAME;
+ }
+
public void setVersion(int version) {
this.version = version;
}
- public boolean inReceive() {
- //TODO implement the inactivity monitor
- return false;
- }
+ public boolean inReceive() {
+ //TODO implement the inactivity monitor
+ return false;
+ }
public Transport createTransportFilters(Transport transport, Map options) {
if (transport.isUseInactivityMonitor()) {
@@ -215,7 +220,5 @@
}
return transport;
}
-
-
}
Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.stomp;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
@@ -26,4 +27,25 @@
public WireFormat createWireFormat() {
return new StompWireFormat();
}
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormatFactory#matchesWireformatHeader(org.apache.activemq.util.ByteSequence)
+ */
+ public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+ throw new UnsupportedOperationException();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormatFactory#maxWireformatHeaderLength()
+ */
+ public int maxWireformatHeaderLength() {
+ throw new UnsupportedOperationException();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.wireformat.WireFormatFactory#isDiscriminatable()
+ */
+ public boolean isDiscriminatable() {
+ return false;
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -22,10 +22,14 @@
import org.apache.activemq.transport.stomp.StompWireFormatFactory;
import org.apache.activemq.util.ByteSequence;
-public class DiscriminatableStompWireFormatFactory extends StompWireFormatFactory implements DiscriminatableWireFormatFactory {
+public class DiscriminatableStompWireFormatFactory extends StompWireFormatFactory {
static byte MAGIC[] = toBytes(Stomp.Commands.CONNECT);
+ public boolean isDiscriminatable() {
+ return true;
+ }
+
static private byte[] toBytes(String value) {
try {
return value.getBytes("UTF-8");
@@ -59,9 +63,4 @@
public int maxWireformatHeaderLength() {
return MAGIC.length+10;
}
-
- public String wireformatName() {
- return "stomp";
- }
-
}
Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java Thu Jun 11 02:18:07 2009
@@ -7,17 +7,6 @@
public class StompBrokerTest extends BrokerTestBase {
@Override
- protected void setUp() throws Exception {
- super.setUp();
- if (tcp) {
- sendBrokerBindURI = "tcp://localhost:10000?wireFormat=multi&transport.useInactivityMonitor=false";
- receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=multi&transport.useInactivityMonitor=false";
- sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=stomp&useInactivityMonitor=false";
- receiveBrokerConnectURI = "tcp://localhost:20000?wireFormat=stomp&useInactivityMonitor=false";
- }
- }
-
- @Override
protected RemoteProducer cerateProducer() {
return new StompRemoteProducer();
}
@@ -27,5 +16,11 @@
return new StompRemoteConsumer();
}
-
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.BrokerTestBase#getRemoteWireFormat()
+ */
+ @Override
+ protected String getRemoteWireFormat() {
+ return "stomp";
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java Thu Jun 11 02:18:07 2009
@@ -20,6 +20,7 @@
import java.net.URI;
import org.apache.activemq.Service;
+import org.apache.activemq.wireformat.WireFormat;
/**
* Represents the client side of a transport allowing messages to be sent
@@ -157,6 +158,11 @@
boolean isConnected();
/**
+ * @return The wireformat for the connection.
+ */
+ WireFormat getWireformat();
+
+ /**
* reconnect to another location
* @param uri
* @throws IOException on failure of if not supported
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Thu Jun 11 02:18:07 2009
@@ -19,6 +19,8 @@
import java.io.IOException;
import java.net.URI;
+import org.apache.activemq.wireformat.WireFormat;
+
/**
* @version $Revision: 1.5 $
*/
@@ -45,7 +47,8 @@
/**
* @see org.apache.activemq.Service#start()
- * @throws IOException if the next channel has not been set.
+ * @throws IOException
+ * if the next channel has not been set.
*/
public void start() throws Exception {
if (next == null) {
@@ -126,19 +129,23 @@
return next.isFaultTolerant();
}
- public boolean isDisposed() {
- return next.isDisposed();
- }
-
- public boolean isConnected() {
+ public boolean isDisposed() {
+ return next.isDisposed();
+ }
+
+ public boolean isConnected() {
return next.isConnected();
}
- public void reconnect(URI uri) throws IOException {
- next.reconnect(uri);
- }
+ public void reconnect(URI uri) throws IOException {
+ next.reconnect(uri);
+ }
public boolean isUseInactivityMonitor() {
return next.isUseInactivityMonitor();
}
+
+ public WireFormat getWireformat() {
+ return next.getWireformat();
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Thu Jun 11 02:18:07 2009
@@ -199,6 +199,13 @@
public void setDispatchPriority(int priority) {
readContext.updatePriority(priority);
}
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.transport.Transport#getWireformat()
+ */
+ public WireFormat getWireformat() {
+ return wireFormat;
+ }
}
private class PipeTransportServer implements TransportServer {
@@ -267,7 +274,7 @@
String node = uri.getHost();
if (servers.containsKey(node)) {
- throw new IOException("Server allready bound: " + node);
+ throw new IOException("Server already bound: " + node);
}
PipeTransportServer server = new PipeTransportServer();
server.setConnectURI(uri);
@@ -275,6 +282,7 @@
if (options.containsKey("wireFormat")) {
server.setWireFormatFactory(createWireFormatFactory(options));
}
+
servers.put(node, server);
return server;
} catch (URISyntaxException e) {