You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/18 05:28:52 UTC

svn commit: r892128 [2/2] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-dispatcher/src/main/java/org/apache/activemq/dis...

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java Fri Dec 18 04:28:49 2009
@@ -21,12 +21,14 @@
 import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.actor.ActorProxy;
-import org.apache.activemq.dispatch.DispatchObject;
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.queue.actor.perf.DispatchObjectFilter;
 import org.apache.activemq.queue.actor.transport.Transport;
 import org.apache.activemq.queue.actor.transport.TransportFactory;
 import org.apache.activemq.queue.actor.transport.TransportHandler;
@@ -45,7 +47,8 @@
 public class PipeTransportFactory implements TransportFactory {
 
     static protected final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
-    
+    static final AtomicLong CONNECTION_IDS = new AtomicLong();
+
     static void perform_unbind(PipeTransportServer server) {
         synchronized(servers) {
             servers.remove(server.name);
@@ -69,7 +72,10 @@
                 throw new IOException("Server not bound: " + clientTransport.name);
             }
         }
-        PipeTransport serverTransport = new PipeTransport(server.dispatcher);
+        
+        long connectionId = clientTransport.connectionId;
+        String remoteAddress = clientTransport.connnectAddress+":server:"+connectionId;
+        PipeTransport serverTransport = new PipeTransport(server.dispatcher, connectionId, remoteAddress);
         clientTransport.peer = serverTransport.actor;
         serverTransport.peer = clientTransport.actor;
         server.actor.onConnect(serverTransport);
@@ -87,13 +93,12 @@
             throw new IllegalArgumentException("Invalid bind uri: "+e, e);
         }
 
-        PipeTransportServer rc = new PipeTransportServer(dispatcher);
+        PipeTransportServer rc = new PipeTransportServer(dispatcher, name);
         rc.connectURI = bindUri;
         IntrospectionSupport.setProperties(rc, options);
         if (!options.isEmpty()) {
             throw new IllegalArgumentException("Invalid bind uri parameters: " + options);
         }
-        rc.name = name;
         return rc;
     }
 
@@ -109,7 +114,9 @@
             throw new IllegalArgumentException("Invalid connect uri: "+e, e);
         }
         
-        PipeTransport rc = new PipeTransport(dispatcher);
+        long connectionId = CONNECTION_IDS.incrementAndGet();
+        String remoteAddress = connectUri+":client:"+connectionId;
+        PipeTransport rc = new PipeTransport(dispatcher, connectionId, remoteAddress);
         IntrospectionSupport.setProperties(rc, options);
         if (!options.isEmpty()) {
             throw new IllegalArgumentException("Invalid connect uri parameters: " + options);
@@ -120,45 +127,6 @@
 
     }
     
-    public static class DispatchObjectFilter implements DispatchObject {
-        protected DispatchObject next;
-        
-        public DispatchObjectFilter() {
-        }
-
-        public DispatchObjectFilter(DispatchObject next) {
-            this.next = next;
-        }
-        
-        public void addShutdownWatcher(Runnable shutdownWatcher) {
-            next.addShutdownWatcher(shutdownWatcher);
-        }
-        public <Context> Context getContext() {
-            return next.getContext();
-        }
-        public DispatchQueue getTargetQueue() {
-            return next.getTargetQueue();
-        }
-        public void release() {
-            next.release();
-        }
-        public void resume() {
-            next.resume();
-        }
-        public void retain() {
-            next.retain();
-        }
-        public <Context> void setContext(Context context) {
-            next.setContext(context);
-        }
-        public void setTargetQueue(DispatchQueue queue) {
-            next.setTargetQueue(queue);
-        }
-        public void suspend() {
-            next.suspend();
-        }
-    }
-    
     interface PipeTransportServerActor {
         public void onBind();
         public void onConnect(PipeTransport serverSide);
@@ -179,11 +147,11 @@
         protected boolean marshal;
         
         protected final AtomicInteger suspendCounter = new AtomicInteger();
-        protected long connectionCounter;
 
-        public PipeTransportServer(Dispatcher dispatcher) {
-            super( dispatcher.createSerialQueue(null) );
+        public PipeTransportServer(Dispatcher dispatcher, String name) {
+            super( dispatcher.createSerialQueue(name) );
             this.dispatcher = dispatcher;
+            this.name=name;
             dispatchQueue = (DispatchQueue) next;
             dispatchQueue.suspend();
             dispatchQueue.addShutdownWatcher(new Runnable() {
@@ -214,8 +182,6 @@
         }
 
         public void onConnect(PipeTransport serverSide) {
-            long connectionId = connectionCounter++;
-            serverSide.remoteAddress = connectURI.toString() + "#" + connectionId;
             handler.onAccept(serverSide);
         }
 
@@ -266,21 +232,18 @@
         private String wireFormat;
         private boolean marshal;
         
+        protected final AtomicBoolean connected = new AtomicBoolean();
         protected final AtomicInteger suspendCounter = new AtomicInteger();
-        
-        public PipeTransport(Dispatcher dispatcher) {
-            super( dispatcher.createSerialQueue(null) );
+        private final long connectionId;
+
+        final protected AtomicInteger retained = new AtomicInteger(1);
+
+        public PipeTransport(Dispatcher dispatcher, long connectionId, String remoteAddress) {
+            super( dispatcher.createSerialQueue(remoteAddress) );
+            this.connectionId = connectionId;
             this.dispatchQueue = (DispatchQueue) next;
             this.dispatchQueue.suspend();
-            this.dispatchQueue.addShutdownWatcher(new Runnable() {
-                public void run() {
-                    PipeTransportActor peer = PipeTransport.this.peer;
-                    if( peer!=null ) {
-                        peer.onDisconnect();
-                        peer = null;
-                    }
-                }
-            });
+            this.remoteAddress = remoteAddress;
             
             // Queue up the connect event so it's the first thing that gets executed when
             // this object gets resumed..
@@ -288,28 +251,65 @@
             this.actor.onConnect();
         }
         
+        public void retain() {
+            retained.getAndIncrement();
+        }
+
+        public void release() {
+            if (retained.decrementAndGet() == 0) {
+                PipeTransportActor peer = PipeTransport.this.peer;
+                if( peer!=null ) {
+                    peer.onDisconnect();
+                }
+                dispatchQueue.dispatchAsync(new Runnable(){
+                    public void run() {
+                        handler = null;
+                    }
+                });
+                dispatchQueue.release();                
+            }
+        }
+        
         public void setHandler(TransportHandler hanlder) {
             this.handler = hanlder;
         }
         
         public void onConnect() {
             try {
+                if( !connected.compareAndSet(false, true) ) {
+                    throw new IOException("allready connected.");
+                }
+                
                 if (connnectAddress != null) {
                     // Client side connect case...
                     perform_connect(this);
-                    remoteAddress = connnectAddress;
-                    handler.onConnect();
+                    if( handler!=null ) {
+                        handler.onConnect();
+                    }
                 } else {
                     // Server side connect case...
-                    if( peer==null || remoteAddress==null ) {
+                    if( peer==null ) {
                         throw new IOException("Server transport not properly initialized.");
                     }
-                    handler.onConnect();
+                    if( handler!=null ) {
+                        handler.onConnect();
+                    }
                 }
+                dispatchQueue.retain();
             } catch (IOException e) {
-                handler.onFailure(e);
+                onFailure(e);
             }
         }
+        
+        public void onDisconnect() {
+            if( !connected.compareAndSet(true, false) ) {
+                throw new AssertionError("Was not connected.");
+            }
+            if( handler!=null ) {
+                handler.onDisconnect();
+            }
+            dispatchQueue.release();
+        }
 
         public void send(Object message) {
             send(message, null, null);
@@ -328,21 +328,26 @@
                 complete(onCompleted, queue);
                 return;
             }
+            if( queue!=null ) {
+                queue.retain();
+            }
             peer.onDispatch(message, onCompleted, queue);
         }
 
         public void onDispatch(Object message, Runnable onCompleted, DispatchQueue queue) {
             try {
-                Object m = message;
-                if (wf != null && marshal) {
-                    try {
-                        m = wf.unmarshal((Buffer) m);
-                    } catch (IOException e) {
-                        handler.onFailure(e);
-                        return;
+                if( handler!=null ) {
+                    Object m = message;
+                    if (wf != null && marshal) {
+                        try {
+                            m = wf.unmarshal((Buffer) m);
+                        } catch (IOException e) {
+                            handler.onFailure(e);
+                            return;
+                        }
                     }
+                    handler.onRecevie(m);
                 }
-                handler.onRecevie(m);
             } finally {
                 complete(onCompleted, queue);
             }
@@ -352,17 +357,19 @@
             if( onCompleted!=null ) {
                 if(queue!=null) {
                     queue.dispatchAsync(onCompleted);
+                    if( queue!=null ) {
+                        queue.release();
+                    }
                 } else {
                     onCompleted.run();
                 }
             }
         }
         
-        public void onDisconnect() {
-            handler.onDisconnect();
-        }
         public void onFailure(Exception e) {
-            handler.onFailure(e);
+            if( handler!=null ) {
+                handler.onFailure(e);
+            }
         }
 
         public String getRemoteAddress() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java Fri Dec 18 04:28:49 2009
@@ -210,7 +210,7 @@
         broker.setName("Broker");
         broker.createDispatcher();
         try {
-            broker.getDispatcher().retain();
+            broker.getDispatcher().resume();
             broker.startServices();
         } catch (Exception e) {
             // TODO Auto-generated catch block

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java Fri Dec 18 04:28:49 2009
@@ -255,7 +255,7 @@
     private void createConnections(int destCount) throws Exception {
 
         dispatcher = createDispatcher("BrokerDispatcher");
-        dispatcher.retain();
+        dispatcher.resume();
 
         if (multibroker) {
             sendBroker = createBroker("SendBroker", sendBrokerURI);
@@ -287,7 +287,7 @@
         Dispatcher clientDispatcher = null;
         if (SEPARATE_CLIENT_DISPATCHER) {
             clientDispatcher = createDispatcher("ClientDispatcher");
-            clientDispatcher.retain();
+            clientDispatcher.resume();
         } else {
             clientDispatcher = dispatcher;
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java Fri Dec 18 04:28:49 2009
@@ -219,7 +219,7 @@
     }
 
     public void runTest() throws Exception {
-        getDispatcher().retain();
+        getDispatcher().resume();
 
         // Start 'em up.
         startServices();
@@ -304,7 +304,7 @@
         System.out.println(IntrospectionSupport.toString(test));
         try
         {
-            test.getDispatcher().retain();
+            test.getDispatcher().resume();
             test.createConnections();
             test.runTest();
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java Fri Dec 18 04:28:49 2009
@@ -254,14 +254,22 @@
     }
 
     public static String toString(Object target) {
-        return toString(target, Object.class, null);
+        return toString(target, Object.class, null, null);
+    }
+    
+    public static String toString(Object target, String...fields) {
+        return toString(target, Object.class, null, fields);
     }
     
     public static String toString(Object target, Class<?> stopClass) {
-    	return toString(target, stopClass, null);
+    	return toString(target, stopClass, null, null);
+    }
+
+    public static String toString(Object target, Map<String, Object> overrideFields, String...fields) {
+        return toString(target, Object.class, overrideFields, fields);
     }
 
-    public static String toString(Object target, Class<?> stopClass, Map<String, Object> overrideFields) {
+    public static String toString(Object target, Class<?> stopClass, Map<String, Object> overrideFields, String fields[]) {
         try {
             LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();
             addFields(target, target.getClass(), stopClass, map);
@@ -271,6 +279,10 @@
             	    map.put(key, value);
             	}
             }
+            
+            if( fields!=null ) {
+                map.keySet().retainAll(Arrays.asList(fields));
+            }
            
             boolean useMultiLine=false;
             LinkedHashMap<String, String> props = new LinkedHashMap<String, String>();
@@ -313,7 +325,8 @@
                     }
                     buffer.append(entry.getKey());
                     buffer.append(": ");
-                    buffer.append(StringSupport.indent(entry.getValue(), 2));
+                    String value = entry.getValue();
+                    buffer.append(StringSupport.indent(value, 2));
                 }
                 buffer.append("}");
             }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/StringSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/StringSupport.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/StringSupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/StringSupport.java Fri Dec 18 04:28:49 2009
@@ -26,6 +26,9 @@
 public class StringSupport {
 
     public static String indent(String value, int spaces) {
+        if( value == null ) {
+            return null;
+        }
         String indent = fillString(spaces, ' ');
         return value.replaceAll("(\\r?\\n)", "$1"+indent);
     }