You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/12 22:15:15 UTC

svn commit: r743884 - in /activemq/sandbox/activemq-flow/src/test: java/org/apache/activemq/flow/ resources/META-INF/services/org/apache/activemq/transport/

Author: chirino
Date: Thu Feb 12 21:15:14 2009
New Revision: 743884

URL: http://svn.apache.org/viewvc?rev=743884&view=rev
Log:
Adding a PipeTransport so we can bypass the IO layer in our testing.


Added:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
    activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/transport/
    activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/transport/pipe
Modified:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Pipe.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=743884&r1=743883&r2=743884&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Thu Feb 12 21:15:14 2009
@@ -46,6 +46,9 @@
     // Set to mockup up ptp:
     boolean ptp = false;
 
+    // Set to use tcp IO
+    boolean tcp = false;
+
     // Can be set to BLOCKING, POLLING or ASYNC
     public final static int DISPATCH_MODE = AbstractTestConnection.ASYNC;
     // Set's the number of threads to use:
@@ -382,12 +385,22 @@
         }
 
         if (multibroker) {
-            sendBroker = createBroker("SendBroker", "tcp://localhost:10000?wireFormat=test");
+            if( tcp ) {
+                sendBroker = createBroker("SendBroker", "tcp://localhost:10000?wireFormat=test");
+                rcvBroker = createBroker("RcvBroker", "tcp://localhost:20000?wireFormat=test");
+            } else {
+                sendBroker = createBroker("SendBroker", "pipe://SendBroker");
+                rcvBroker = createBroker("RcvBroker", "pipe://RcvBroker");
+            }
             brokers.add(sendBroker);
-            rcvBroker = createBroker("RcvBroker", "tcp://localhost:20000?wireFormat=test");
             brokers.add(rcvBroker);
         } else {
-            sendBroker = rcvBroker = createBroker("Broker", "tcp://localhost:10000?wireFormat=test");
+            if( tcp ) {
+                sendBroker = rcvBroker = createBroker("Broker", "tcp://localhost:10000?wireFormat=test");
+            } else {
+                sendBroker = rcvBroker = createBroker("Broker", "pipe://Broker");
+            }
+            
             brokers.add(sendBroker);
         }
 

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Pipe.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Pipe.java?rev=743884&r1=743883&r2=743884&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Pipe.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Pipe.java Thu Feb 12 21:15:14 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.flow;
 
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 public class Pipe<E> {
     private final LinkedBlockingQueue<E> in;
@@ -34,6 +35,10 @@
         public void onReadReady(Pipe<E> pipe);
     }
 
+    public Pipe(int capacity) {
+        this(new LinkedBlockingQueue<E>(capacity), new LinkedBlockingQueue<E>(capacity));
+    }
+    
     public Pipe() {
         this(new LinkedBlockingQueue<E>(), new LinkedBlockingQueue<E>());
     }
@@ -95,4 +100,17 @@
     public E poll() {
         return in.poll();
     }
+
+    public E poll(long time, TimeUnit unit) throws InterruptedException {
+        return in.poll(time, unit);
+    }
+
+    public boolean offer(E arg0, long arg1, TimeUnit arg2) throws InterruptedException {
+        return out.offer(arg0, arg1, arg2);
+    }
+
+    public boolean offer(E arg0) {
+        return out.offer(arg0);
+    }
+
 }

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java?rev=743884&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java Thu Feb 12 21:15:14 2009
@@ -0,0 +1,207 @@
+package org.apache.activemq.flow;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+public class PipeTransportFactory extends TransportFactory {
+    
+    private final HashMap<String,PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
+    static final AtomicInteger connectionCounter = new AtomicInteger();
+    
+    private static class PipeTransport implements Transport, Runnable {
+
+        private final Pipe<Object> pipe;
+        private TransportListener listener;
+        private String remoteAddress;
+        private AtomicBoolean stopping = new AtomicBoolean();
+        private Thread thread;
+
+        public PipeTransport(Pipe<Object> pipe) {
+            this.pipe = pipe;
+        }
+
+        public void start() throws Exception {
+            thread = new Thread(this, getRemoteAddress());
+            thread.start();
+        }
+
+        public void stop() throws Exception {
+            stopping.set(true);
+            thread.join();
+        }
+        
+        public void oneway(Object command) throws IOException {
+            try {
+                while( !stopping.get() ) {
+                    if( pipe.offer(command, 500, TimeUnit.MILLISECONDS) ) {
+                        break;
+                    }
+                }
+            } catch (InterruptedException e) {
+                throw new InterruptedIOException();
+            }
+        }
+
+        public void run() {
+            try {
+                while( !stopping.get() ) {
+                    Object value = pipe.poll(500, TimeUnit.MILLISECONDS);
+                    if( value!=null ) {
+                        listener.onCommand(value);
+                    }
+                }
+            } catch (InterruptedException e) {
+            }
+        }
+        
+        public String getRemoteAddress() {
+            return remoteAddress;
+        }
+
+        public TransportListener getTransportListener() {
+            return listener;
+        }
+
+        public boolean isConnected() {
+            return !stopping.get();
+        }
+
+        public boolean isDisposed() {
+            return false;
+        }
+
+        public boolean isFaultTolerant() {
+            return false;
+        }
+
+        public <T> T narrow(Class<T> target) {
+            if (target.isAssignableFrom(getClass())) {
+                return target.cast(this);
+            }
+            return null;
+        }
+
+        public void reconnect(URI uri) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+        
+        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+
+        public Object request(Object command) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        public Object request(Object command, int timeout) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        public void setTransportListener(TransportListener listener) {
+            this.listener = listener;
+        }
+
+        public void setRemoteAddress(String remoteAddress) {
+            this.remoteAddress = remoteAddress;
+        }
+
+    }
+    
+    private class PipeTransportServer implements TransportServer {
+        private URI connectURI;
+        private TransportAcceptListener listener;
+        private String name;
+
+        public URI getConnectURI() {
+            return connectURI;
+        }
+
+        public InetSocketAddress getSocketAddress() {
+            return null;
+        }
+
+        public void setAcceptListener(TransportAcceptListener listener) {
+            this.listener = listener;
+        }
+
+        public void setBrokerInfo(BrokerInfo brokerInfo) {
+        }
+
+        public void start() throws Exception {
+        }
+
+        public void stop() throws Exception {
+            unbind(this);
+        }
+
+        public void setConnectURI(URI connectURI) {
+            this.connectURI = connectURI;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public Transport connect() {
+            int connectionId = connectionCounter.incrementAndGet();
+            String remoteAddress = connectURI.toString()+"#"+connectionId;
+            assert listener!= null: "Server does not have an accept listener";
+            Pipe<Object> pipe = new Pipe<Object>(10);
+            PipeTransport rc = new PipeTransport(pipe);
+            rc.setRemoteAddress(remoteAddress);
+            PipeTransport serverSide = new PipeTransport(pipe.connect());
+            serverSide.setRemoteAddress(remoteAddress);
+            listener.onAccept(serverSide);
+            return rc;
+        }
+    }
+    
+    @Override
+    public synchronized TransportServer doBind(URI uri) throws IOException {
+        String node = uri.getHost();
+        if( servers.containsKey(node) ) {
+            throw new IOException("Server allready bound: "+node);
+        }
+        PipeTransportServer server = new PipeTransportServer();
+        server.setConnectURI(uri);
+        server.setName(node);
+        servers.put(node, server);
+        return server;
+    }
+    
+    private synchronized void unbind(PipeTransportServer server) {
+        servers.remove(server.getName());
+    }
+
+    @Override
+    public synchronized Transport doCompositeConnect(URI location) throws Exception {
+        String name = location.getHost();
+        PipeTransportServer server = servers.get(name );
+        if( server==null ) {
+            throw new IOException("Server is not bound: "+name);
+        }
+        return server.connect();
+    }
+
+}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java?rev=743884&r1=743883&r2=743884&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java Thu Feb 12 21:15:14 2009
@@ -23,6 +23,8 @@
 
     final void route(ISourceController<Message> source, Message msg) {
         Collection<DeliveryTarget> targets = lookupTable.get(msg.getDestination());
+        if( targets == null ) 
+            return;
         for (DeliveryTarget dt : targets) {
             if (dt.match(msg)) {
                 dt.getSink().add(msg, source);

Added: activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/transport/pipe
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/transport/pipe?rev=743884&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/transport/pipe (added)
+++ activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/transport/pipe Thu Feb 12 21:15:14 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.flow.PipeTransportFactory