You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/12 22:15:15 UTC
svn commit: r743884 - in /activemq/sandbox/activemq-flow/src/test:
java/org/apache/activemq/flow/
resources/META-INF/services/org/apache/activemq/transport/
Author: chirino
Date: Thu Feb 12 21:15:14 2009
New Revision: 743884
URL: http://svn.apache.org/viewvc?rev=743884&view=rev
Log:
Adding a PipeTransport so we can bypass the IO layer in our testing.
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/transport/
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/transport/pipe
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Pipe.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=743884&r1=743883&r2=743884&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Thu Feb 12 21:15:14 2009
@@ -46,6 +46,9 @@
// Set to mockup up ptp:
boolean ptp = false;
+ // Set to use tcp IO
+ boolean tcp = false;
+
// Can be set to BLOCKING, POLLING or ASYNC
public final static int DISPATCH_MODE = AbstractTestConnection.ASYNC;
// Set's the number of threads to use:
@@ -382,12 +385,22 @@
}
if (multibroker) {
- sendBroker = createBroker("SendBroker", "tcp://localhost:10000?wireFormat=test");
+ if( tcp ) {
+ sendBroker = createBroker("SendBroker", "tcp://localhost:10000?wireFormat=test");
+ rcvBroker = createBroker("RcvBroker", "tcp://localhost:20000?wireFormat=test");
+ } else {
+ sendBroker = createBroker("SendBroker", "pipe://SendBroker");
+ rcvBroker = createBroker("RcvBroker", "pipe://RcvBroker");
+ }
brokers.add(sendBroker);
- rcvBroker = createBroker("RcvBroker", "tcp://localhost:20000?wireFormat=test");
brokers.add(rcvBroker);
} else {
- sendBroker = rcvBroker = createBroker("Broker", "tcp://localhost:10000?wireFormat=test");
+ if( tcp ) {
+ sendBroker = rcvBroker = createBroker("Broker", "tcp://localhost:10000?wireFormat=test");
+ } else {
+ sendBroker = rcvBroker = createBroker("Broker", "pipe://Broker");
+ }
+
brokers.add(sendBroker);
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Pipe.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Pipe.java?rev=743884&r1=743883&r2=743884&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Pipe.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Pipe.java Thu Feb 12 21:15:14 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.flow;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
public class Pipe<E> {
private final LinkedBlockingQueue<E> in;
@@ -34,6 +35,10 @@
public void onReadReady(Pipe<E> pipe);
}
+ public Pipe(int capacity) {
+ this(new LinkedBlockingQueue<E>(capacity), new LinkedBlockingQueue<E>(capacity));
+ }
+
public Pipe() {
this(new LinkedBlockingQueue<E>(), new LinkedBlockingQueue<E>());
}
@@ -95,4 +100,17 @@
public E poll() {
return in.poll();
}
+
+ public E poll(long time, TimeUnit unit) throws InterruptedException {
+ return in.poll(time, unit);
+ }
+
+ public boolean offer(E arg0, long arg1, TimeUnit arg2) throws InterruptedException {
+ return out.offer(arg0, arg1, arg2);
+ }
+
+ public boolean offer(E arg0) {
+ return out.offer(arg0);
+ }
+
}
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java?rev=743884&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java Thu Feb 12 21:15:14 2009
@@ -0,0 +1,207 @@
+package org.apache.activemq.flow;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+public class PipeTransportFactory extends TransportFactory {
+
+ private final HashMap<String,PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
+ static final AtomicInteger connectionCounter = new AtomicInteger();
+
+ private static class PipeTransport implements Transport, Runnable {
+
+ private final Pipe<Object> pipe;
+ private TransportListener listener;
+ private String remoteAddress;
+ private AtomicBoolean stopping = new AtomicBoolean();
+ private Thread thread;
+
+ public PipeTransport(Pipe<Object> pipe) {
+ this.pipe = pipe;
+ }
+
+ public void start() throws Exception {
+ thread = new Thread(this, getRemoteAddress());
+ thread.start();
+ }
+
+ public void stop() throws Exception {
+ stopping.set(true);
+ thread.join();
+ }
+
+ public void oneway(Object command) throws IOException {
+ try {
+ while( !stopping.get() ) {
+ if( pipe.offer(command, 500, TimeUnit.MILLISECONDS) ) {
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ }
+
+ public void run() {
+ try {
+ while( !stopping.get() ) {
+ Object value = pipe.poll(500, TimeUnit.MILLISECONDS);
+ if( value!=null ) {
+ listener.onCommand(value);
+ }
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+
+ public String getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ public TransportListener getTransportListener() {
+ return listener;
+ }
+
+ public boolean isConnected() {
+ return !stopping.get();
+ }
+
+ public boolean isDisposed() {
+ return false;
+ }
+
+ public boolean isFaultTolerant() {
+ return false;
+ }
+
+ public <T> T narrow(Class<T> target) {
+ if (target.isAssignableFrom(getClass())) {
+ return target.cast(this);
+ }
+ return null;
+ }
+
+ public void reconnect(URI uri) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+
+ public Object request(Object command) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public Object request(Object command, int timeout) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setTransportListener(TransportListener listener) {
+ this.listener = listener;
+ }
+
+ public void setRemoteAddress(String remoteAddress) {
+ this.remoteAddress = remoteAddress;
+ }
+
+ }
+
+ private class PipeTransportServer implements TransportServer {
+ private URI connectURI;
+ private TransportAcceptListener listener;
+ private String name;
+
+ public URI getConnectURI() {
+ return connectURI;
+ }
+
+ public InetSocketAddress getSocketAddress() {
+ return null;
+ }
+
+ public void setAcceptListener(TransportAcceptListener listener) {
+ this.listener = listener;
+ }
+
+ public void setBrokerInfo(BrokerInfo brokerInfo) {
+ }
+
+ public void start() throws Exception {
+ }
+
+ public void stop() throws Exception {
+ unbind(this);
+ }
+
+ public void setConnectURI(URI connectURI) {
+ this.connectURI = connectURI;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Transport connect() {
+ int connectionId = connectionCounter.incrementAndGet();
+ String remoteAddress = connectURI.toString()+"#"+connectionId;
+ assert listener!= null: "Server does not have an accept listener";
+ Pipe<Object> pipe = new Pipe<Object>(10);
+ PipeTransport rc = new PipeTransport(pipe);
+ rc.setRemoteAddress(remoteAddress);
+ PipeTransport serverSide = new PipeTransport(pipe.connect());
+ serverSide.setRemoteAddress(remoteAddress);
+ listener.onAccept(serverSide);
+ return rc;
+ }
+ }
+
+ @Override
+ public synchronized TransportServer doBind(URI uri) throws IOException {
+ String node = uri.getHost();
+ if( servers.containsKey(node) ) {
+ throw new IOException("Server allready bound: "+node);
+ }
+ PipeTransportServer server = new PipeTransportServer();
+ server.setConnectURI(uri);
+ server.setName(node);
+ servers.put(node, server);
+ return server;
+ }
+
+ private synchronized void unbind(PipeTransportServer server) {
+ servers.remove(server.getName());
+ }
+
+ @Override
+ public synchronized Transport doCompositeConnect(URI location) throws Exception {
+ String name = location.getHost();
+ PipeTransportServer server = servers.get(name );
+ if( server==null ) {
+ throw new IOException("Server is not bound: "+name);
+ }
+ return server.connect();
+ }
+
+}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java?rev=743884&r1=743883&r2=743884&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java Thu Feb 12 21:15:14 2009
@@ -23,6 +23,8 @@
final void route(ISourceController<Message> source, Message msg) {
Collection<DeliveryTarget> targets = lookupTable.get(msg.getDestination());
+ if( targets == null )
+ return;
for (DeliveryTarget dt : targets) {
if (dt.match(msg)) {
dt.getSink().add(msg, source);
Added: activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/transport/pipe
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/transport/pipe?rev=743884&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/transport/pipe (added)
+++ activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/transport/pipe Thu Feb 12 21:15:14 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.flow.PipeTransportFactory