You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/15 23:49:10 UTC
svn commit: r784998 - in /activemq/sandbox/activemq-flow:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/
activemq-broker/src/test/java/org/apache/activemq/apollo/t...
Author: chirino
Date: Mon Jun 15 21:49:09 2009
New Revision: 784998
URL: http://svn.apache.org/viewvc?rev=784998&view=rev
Log:
Fixing up the way pipe and vm transports are initialized so that they work the same way that tcp does.
Added:
activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/wireformat/
activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/
activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-transport/src/test/resources/META-INF/services/org/apache/activemq/wireformat/
activemq/sandbox/activemq-flow/activemq-transport/src/test/resources/META-INF/services/org/apache/activemq/wireformat/mock
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java
activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/ActiveMQConnectionFactoryTest.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/Pipe.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java?rev=784998&r1=784997&r2=784998&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java Mon Jun 15 21:49:09 2009
@@ -34,7 +34,7 @@
public BrokerConnection() {
setExceptionListener(new ExceptionListener(){
public void exceptionThrown(Exception error) {
- LOG.debug("Transport failed before messaging protocol was initialized.", error);
+ LOG.info("Transport failed before messaging protocol was initialized.", error);
try {
stop();
} catch (Exception ignore) {
@@ -78,6 +78,9 @@
protocolHandler.onException(error);
}
});
+
+ protocolHandler.onCommand(command);
+
} catch (Exception e) {
onException(e);
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java?rev=784998&r1=784997&r2=784998&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java Mon Jun 15 21:49:09 2009
@@ -18,6 +18,8 @@
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -28,7 +30,9 @@
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.pipe.Pipe;
import org.apache.activemq.transport.pipe.PipeTransportFactory;
+import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -89,63 +93,69 @@
private static final String DEFAULT_PIPE_NAME = Broker.DEFAULT_VIRTUAL_HOST_NAME.toString();
@Override
- synchronized public Transport doCompositeConnect(URI location) throws Exception {
-
- String brokerURI = null;
- String name;
- boolean create = true;
-
- name = location.getHost();
- if (name == null) {
- name = DEFAULT_PIPE_NAME;
- }
+ protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
+ try {
+
+ String brokerURI = null;
+ String name;
+ boolean create = true;
+
+ name = location.getHost();
+ if (name == null) {
+ name = DEFAULT_PIPE_NAME;
+ }
- Map<String, String> options = URISupport.parseParamters(location);
- String config = (String) options.remove("broker");
- if (config != null) {
- brokerURI = config;
- }
- if ("false".equals(options.remove("create"))) {
- create = false;
- }
- if( !options.isEmpty() ) {
- throw new IllegalArgumentException("Unrecognized vm transport parameters: "+options.keySet());
- }
+ Map<String, String> options = URISupport.parseParamters(location);
+ String config = (String) options.remove("broker");
+ if (config != null) {
+ brokerURI = config;
+ }
+ if ("false".equals(options.remove("create"))) {
+ create = false;
+ }
- PipeTransportServer server = servers.get(name);
- if (server == null && create) {
-
- // Create the broker on demand.
- Broker broker;
- if( brokerURI == null ) {
- broker = new Broker();
- } else {
- broker = BrokerFactory.createBroker(brokerURI);
- }
-
- // Remove the existing pipe severs if the broker is configured with one... we want to make sure it
- // uses the one we explicitly configure here.
- for (TransportServer s : broker.getTransportServers()) {
- if (s instanceof PipeTransportServer && name.equals(((PipeTransportServer) s).getName())) {
- broker.removeTransportServer(s);
+ PipeTransportServer server = servers.get(name);
+ if (server == null && create) {
+
+ // Create the broker on demand.
+ Broker broker;
+ if( brokerURI == null ) {
+ broker = new Broker();
+ } else {
+ broker = BrokerFactory.createBroker(brokerURI);
}
+
+ // Remove the existing pipe severs if the broker is configured with one... we want to make sure it
+ // uses the one we explicitly configure here.
+ for (TransportServer s : broker.getTransportServers()) {
+ if (s instanceof PipeTransportServer && name.equals(((PipeTransportServer) s).getName())) {
+ broker.removeTransportServer(s);
+ }
+ }
+
+ // We want to use a vm transport server impl.
+ VmTransportServer vmTransportServer = (VmTransportServer) TransportFactory.bind(new URI("vm://" + name));
+ vmTransportServer.setBroker(broker);
+ broker.addTransportServer(vmTransportServer);
+ broker.start();
+
+ server = servers.get(name);
+ }
+
+ if (server == null) {
+ throw new IOException("Server is not bound: " + name);
}
- // We want to use a vm transport server impl.
- VmTransportServer vmTransportServer = (VmTransportServer) TransportFactory.bind(new URI("vm://" + name));
- vmTransportServer.setBroker(broker);
- broker.addTransportServer(vmTransportServer);
- broker.start();
+ PipeTransport transport = server.connect();
+ transport.setWireFormat(wf);
+ return transport;
- server = servers.get(name);
- }
-
- if (server == null) {
- throw new IOException("Server is not bound: " + name);
+ } catch (URISyntaxException e) {
+ throw IOExceptionSupport.create(e);
+ } catch (Exception e) {
+ throw IOExceptionSupport.create(e);
}
-
- return server.connect();
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java?rev=784998&r1=784997&r2=784998&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java Mon Jun 15 21:49:09 2009
@@ -35,7 +35,7 @@
}
public void testAutoCreateBroker() throws Exception {
- Transport connect = TransportFactory.compositeConnect(new URI("vm://test"));
+ Transport connect = TransportFactory.compositeConnect(new URI("vm://test?wireFormat=mock"));
assertNotNull(connect);
connect.stop();
System.out.println("done");
@@ -43,7 +43,7 @@
public void testNoAutoCreateBroker() throws Exception {
try {
- TransportFactory.compositeConnect(new URI("vm://test?create=false"));
+ TransportFactory.compositeConnect(new URI("vm://test?create=false&wireFormat=mock"));
fail("Expected a IOException");
} catch (IOException e) {
}
@@ -51,7 +51,7 @@
public void testBadOptions() throws Exception {
try {
- TransportFactory.compositeConnect(new URI("vm://test?crazy-option=false"));
+ TransportFactory.compositeConnect(new URI("vm://test?crazy-option=false&wireFormat=mock"));
fail("Expected a IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
Modified: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/ActiveMQConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/ActiveMQConnectionFactoryTest.java?rev=784998&r1=784997&r2=784998&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/ActiveMQConnectionFactoryTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/ActiveMQConnectionFactoryTest.java Mon Jun 15 21:49:09 2009
@@ -30,25 +30,27 @@
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
-import org.apache.activemq.apollo.CombinationTestSupport;
+import org.apache.activemq.apollo.AutoFailTestSupport;
import org.apache.activemq.apollo.broker.Broker;
import org.apache.activemq.apollo.transport.vm.VMTransportFactory;
import org.apache.activemq.broker.store.memory.MemoryStore;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
+public class ActiveMQConnectionFactoryTest extends AutoFailTestSupport {
private static final Log LOG = LogFactory.getLog(ActiveMQConnectionFactoryTest.class);
private ActiveMQConnection connection;
private Broker broker;
+
+ static {
+ System.setProperty("org.apache.activemq.default.directory.prefix", "target/test-data/");
+ }
public void testUseURIToSetUseClientIDPrefixOnConnectionFactory() throws URISyntaxException, JMSException {
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
- "vm://localhost?jms.clientIDPrefix=Cheese");
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.clientIDPrefix=Cheese");
assertEquals("Cheese", cf.getClientIDPrefix());
connection = (ActiveMQConnection)cf.createConnection();
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=784998&r1=784997&r2=784998&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Mon Jun 15 21:49:09 2009
@@ -21,6 +21,7 @@
import java.util.LinkedList;
import org.apache.activemq.apollo.WindowLimiter;
+import org.apache.activemq.apollo.broker.Broker;
import org.apache.activemq.apollo.broker.BrokerConnection;
import org.apache.activemq.apollo.broker.BrokerMessageDelivery;
import org.apache.activemq.apollo.broker.BrokerSubscription;
@@ -204,9 +205,12 @@
// the details about this
// broker.
BrokerInfo brokerInfo = new BrokerInfo();
- brokerInfo.setBrokerId(new BrokerId(connection.getBroker().getName()));
- brokerInfo.setBrokerName(connection.getBroker().getName());
- brokerInfo.setBrokerURL(connection.getBroker().getConnectUris().get(0));
+ Broker broker = connection.getBroker();
+ brokerInfo.setBrokerId(new BrokerId(broker.getName()));
+ brokerInfo.setBrokerName(broker.getName());
+ if( !broker.getConnectUris().isEmpty() ) {
+ brokerInfo.setBrokerURL(broker.getConnectUris().get(0));
+ }
connection.write(brokerInfo);
return ack(info);
}
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/Pipe.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/Pipe.java?rev=784998&r1=784997&r2=784998&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/Pipe.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/Pipe.java Mon Jun 15 21:49:09 2009
@@ -65,7 +65,7 @@
}
public void write(E o) throws InterruptedException {
- if (mode == BLOCKING) {
+ if (peer.mode == BLOCKING) {
out.put(o);
return;
}
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=784998&r1=784997&r2=784998&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Mon Jun 15 21:49:09 2009
@@ -6,6 +6,7 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -26,6 +27,7 @@
import org.apache.activemq.transport.pipe.Pipe.ReadReadyListener;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
@@ -45,6 +47,8 @@
private DispatchContext readContext;
private String name;
private WireFormat wireFormat;
+ private boolean marshall;
+ private boolean trace;
public PipeTransport(Pipe<Object> pipe) {
this.pipe = pipe;
@@ -89,7 +93,7 @@
public void oneway(Object command) throws IOException {
try {
- if (wireFormat != null) {
+ if (wireFormat != null && marshall) {
pipe.write(wireFormat.marshal(command));
} else {
pipe.write(command);
@@ -115,7 +119,7 @@
if(o == EOF_TOKEN) {
throw new EOFException();
}
- if (wireFormat != null) {
+ if (wireFormat != null && marshall) {
listener.onCommand(wireFormat.unmarshal((ByteSequence) o));
} else {
listener.onCommand(o);
@@ -213,12 +217,17 @@
readContext.updatePriority(priority);
}
- /* (non-Javadoc)
- * @see org.apache.activemq.transport.Transport#getWireformat()
- */
public WireFormat getWireformat() {
return wireFormat;
}
+
+ public boolean isTrace() {
+ return trace;
+ }
+
+ public void setTrace(boolean trace) {
+ this.trace = trace;
+ }
}
protected class PipeTransportServer implements TransportServer {
@@ -259,7 +268,7 @@
return name;
}
- public Transport connect() {
+ public PipeTransport connect() {
int connectionId = connectionCounter.incrementAndGet();
String remoteAddress = connectURI.toString() + "#" + connectionId;
assert listener != null : "Server does not have an accept listener";
@@ -301,10 +310,7 @@
PipeTransportServer server = createTransportServer();
server.setConnectURI(uri);
server.setName(node);
- if (options.containsKey("wireFormat")) {
- server.setWireFormatFactory(createWireFormatFactory(options));
- }
-
+ server.setWireFormatFactory(createWireFormatFactory(options));
servers.put(node, server);
return server;
}
@@ -316,18 +322,7 @@
protected PipeTransportServer createTransportServer() {
return new PipeTransportServer();
}
-
- @Override
- public Transport doCompositeConnect(URI location) throws Exception {
- String name = location.getHost();
- synchronized(servers) {
- PipeTransportServer server = lookup(name);
- if (server == null) {
- throw new IOException("Server is not bound: " + name);
- }
- return server.connect();
- }
- }
+
static public PipeTransportServer lookup(String name) {
synchronized(servers) {
@@ -346,5 +341,47 @@
servers.remove(server.getName());
}
}
+
+ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+ PipeTransport pipeTransport = (PipeTransport)transport.narrow(PipeTransport.class);
+ IntrospectionSupport.setProperties(pipeTransport, options);
+
+ if (pipeTransport.isTrace()) {
+ throw new UnsupportedOperationException("Trace not implemented");
+// try {
+// transport = TransportLoggerFactory.getInstance().createTransportLogger(transport, pipeTransport.getLogWriterName(),
+// pipeTransport.isDynamicManagement(), pipeTransport.isStartLogging(), pipeTransport.getJmxPort());
+// } catch (Throwable e) {
+// LOG.error("Could not create TransportLogger object for: " + pipeTransport.getLogWriterName() + ", reason: " + e, e);
+// }
+ }
+
+ transport = format.createTransportFilters(transport, options);
+ return transport;
+ }
+
+ protected String getOption(Map options, String key, String def) {
+ String rc = (String) options.remove(key);
+ if( rc == null ) {
+ rc = def;
+ }
+ return rc;
+ }
+
+ protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
+ String name = location.getHost();
+ synchronized(servers) {
+ PipeTransportServer server = lookup(name);
+ if (server == null) {
+ throw new IOException("Server is not bound: " + name);
+ }
+ PipeTransport transport = server.connect();
+ transport.setWireFormat(wf);
+ return transport;
+ }
+ }
+
+
+
}
Added: activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java?rev=784998&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java Mon Jun 15 21:49:09 2009
@@ -0,0 +1,67 @@
+package org.apache.activemq.wireformat.mock;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+public class MockWireFormatFactory implements WireFormatFactory {
+
+ public class MockWireFormat implements WireFormat {
+ public Transport createTransportFilters(Transport transport, Map options) {
+ return transport;
+ }
+
+ public String getName() {
+ return "mock";
+ }
+
+ public int getVersion() {
+ return 0;
+ }
+
+ public boolean inReceive() {
+ return false;
+ }
+
+ public void setVersion(int version) {
+ }
+
+ public ByteSequence marshal(Object command) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void marshal(Object command, DataOutput out) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public Object unmarshal(ByteSequence packet) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public Object unmarshal(DataInput in) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public WireFormat createWireFormat() {
+ return new MockWireFormat();
+ }
+
+ public boolean isDiscriminatable() {
+ return false;
+ }
+
+ public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+ throw new UnsupportedOperationException();
+ }
+
+ public int maxWireformatHeaderLength() {
+ throw new UnsupportedOperationException();
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-transport/src/test/resources/META-INF/services/org/apache/activemq/wireformat/mock
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/test/resources/META-INF/services/org/apache/activemq/wireformat/mock?rev=784998&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/test/resources/META-INF/services/org/apache/activemq/wireformat/mock (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/test/resources/META-INF/services/org/apache/activemq/wireformat/mock Mon Jun 15 21:49:09 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.wireformat.mock.MockWireFormatFactory
\ No newline at end of file