You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:51:26 UTC

svn commit: r961089 - in /activemq/sandbox/activemq-apollo-actor: ./ activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-selector/ activemq-store/ activemq-tcp/ activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/ active...

Author: chirino
Date: Wed Jul  7 03:51:25 2010
New Revision: 961089

URL: http://svn.apache.org/viewvc?rev=961089&view=rev
Log:
tcp transport has much more improved state tracking an cleanup.

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-selector/pom.xml
    activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/pom.xml
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml
    activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml
    activemq/sandbox/activemq-apollo-actor/pom.xml

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala Wed Jul  7 03:51:25 2010
@@ -39,18 +39,18 @@ trait BaseService extends Service {
     def done = callbacks.foreach(_.run)
   }
 
-  case class CREATED extends State
-  case class STARTING extends State with CallbackSupport
-  case class STARTED extends State
-  case class STOPPING extends State with CallbackSupport
-  case class STOPPED extends State
+  object CREATED extends State
+  class  STARTING extends State with CallbackSupport
+  object STARTED extends State
+  class  STOPPING extends State with CallbackSupport
+  object STOPPED extends State
 
   val dispatchQueue:DispatchQueue
 
   final def start() = start(null)
   final def stop() = stop(null)
 
-  protected var _serviceState:State = CREATED()
+  protected var _serviceState:State = CREATED
   protected def serviceState = _serviceState
 
   private def error(msg:String) {
@@ -64,11 +64,11 @@ trait BaseService extends Service {
 
   final def start(onCompleted:Runnable) = ^{
     def do_start = {
-      val state = STARTING()
+      val state = new STARTING()
       state << onCompleted
       _serviceState = state
       _start(^{
-        _serviceState = STARTED()
+        _serviceState = STARTED
         state.done
       })
     }
@@ -78,13 +78,13 @@ trait BaseService extends Service {
       }
     }
     _serviceState match {
-      case x:CREATED =>
+      case CREATED =>
         do_start
-      case x:STOPPED =>
+      case STOPPED =>
         do_start
       case state:STARTING =>
         state << onCompleted
-      case state:STARTED =>
+      case STARTED =>
         done
       case state =>
         done
@@ -99,17 +99,17 @@ trait BaseService extends Service {
       }
     }
     _serviceState match {
-      case x:STARTED =>
-        val state = STOPPING()
+      case STARTED =>
+        val state = new STOPPING
         state << onCompleted
         _serviceState = state
         _stop(^{
-          _serviceState = STOPPED()
+          _serviceState = STOPPED
           state.done
         })
       case state:STOPPING =>
         state << onCompleted
-      case state:STOPPED =>
+      case STOPPED =>
         done
       case state =>
         done

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 03:51:25 2010
@@ -48,7 +48,8 @@ abstract class Connection() extends Tran
   val dispatchQueue = createQueue(id)
   
   def stopped = serviceState match {
-    case STOPPED() | STOPPING() => true
+    case STOPPED => true
+    case x:STOPPING => true
     case _ => false
   }
 
@@ -102,11 +103,9 @@ class BrokerConnection(val broker: Broke
   }
 
   override protected def _stop(onCompleted:Runnable) = {
-    if( !stopped ) {
-      broker.runtime.stopped(this)
-      broker.dispatchQueue.release
-      super._stop(onCompleted)
-    }
+    broker.runtime.stopped(this)
+    broker.dispatchQueue.release
+    super._stop(onCompleted)
   }
 
   override def onTransportConnected() = protocolHandler.onTransportConnected

Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/pom.xml?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/pom.xml Wed Jul  7 03:51:25 2010
@@ -38,6 +38,12 @@
       <artifactId>activemq-util</artifactId>
       <version>6.0-SNAPSHOT</version>
     </dependency>
+    
+    <dependency>
+      <groupId>org.fusesource.hawtbuf</groupId>
+      <artifactId>hawtbuf-core</artifactId>
+      <version>${hawtbuf-version}</version>
+    </dependency>
 
     <!-- TODO: try to remove this dependency -->
     <dependency>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml Wed Jul  7 03:51:25 2010
@@ -38,6 +38,11 @@
       <artifactId>activemq-util</artifactId>
       <version>6.0-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.fusesource.hawtbuf</groupId>
+      <artifactId>hawtbuf-core</artifactId>
+      <version>${hawtbuf-version}</version>
+    </dependency>
 
     <!-- Testing Dependencies -->    
     <dependency>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/pom.xml?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/pom.xml Wed Jul  7 03:51:25 2010
@@ -38,7 +38,7 @@
       <artifactId>activemq-transport</artifactId>
       <version>6.0-SNAPSHOT</version>
     </dependency>
-    
+
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 03:51:25 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.tcp;
 
+import org.apache.activemq.apollo.BaseService;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.wireformat.WireFormat;
@@ -36,15 +37,12 @@ import java.nio.channels.SocketChannel;
 import java.util.LinkedList;
 import java.util.Map;
 
-import static org.apache.activemq.transport.tcp.TcpTransport.SocketState.*;
-import static org.apache.activemq.transport.tcp.TcpTransport.TransportState.*;
-
 /**
  * An implementation of the {@link Transport} interface using raw tcp/ip
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class TcpTransport implements Transport {
+public class TcpTransport extends BaseService implements Transport {
 
     static {
         System.out.println(TcpTransport.class.getClassLoader().getResource("log4j.properties"));
@@ -53,16 +51,116 @@ public class TcpTransport implements Tra
 
     private Map<String, Object> socketOptions;
 
-    enum SocketState {
-        CONNECTING,
-        CONNECTED,
-        DISCONNECTED
+    abstract static class SocketState {
+        void onStop(Runnable onCompleted) {
+        }
+        void onCanceled() {
+        }
+        boolean is(Class<? extends SocketState> clazz) {
+            return getClass()==clazz;
+        }
+    }
+
+    static class DISCONNECTED extends SocketState{}
+
+    class CONNECTING extends SocketState{
+        void onStop(Runnable onCompleted) {
+            trace("CONNECTING.onStop");
+            CANCELING state = new CANCELING();
+            socketState = state;
+            state.onStop(onCompleted);
+        }
+        void onCanceled() {
+            trace("CONNECTING.onCanceled");
+            CANCELING state = new CANCELING();
+            socketState = state;
+            state.onCanceled();
+        }
+    }
+
+    class CONNECTED extends SocketState {
+        void onStop(Runnable onCompleted) {
+            trace("CONNECTED.onStop");
+            CANCELING state = new CANCELING();
+            socketState = state;
+            state.add(createDisconnectTask());
+            state.onStop(onCompleted);
+        }
+        void onCanceled() {
+            trace("CONNECTED.onCanceled");
+            CANCELING state = new CANCELING();
+            socketState = state;
+            state.add(createDisconnectTask());
+            state.onCanceled();
+        }
+        Runnable createDisconnectTask() {
+            return new Runnable(){
+                public void run() {
+                    listener.onTransportDisconnected();
+                }
+            };
+        }
+    }
+
+    class CANCELING extends SocketState {
+        private LinkedList<Runnable> runnables =  new LinkedList<Runnable>();
+        private int remaining;
+        private boolean dispose;
+
+        public CANCELING() {
+            if( readSource!=null ) {
+                remaining++;
+                readSource.cancel();
+            }
+            if( writeSource!=null ) {
+                remaining++;
+                writeSource.cancel();
+            }
+        }
+        void onStop(Runnable onCompleted) {
+            trace("CANCELING.onCompleted");
+            add(onCompleted);
+            dispose = true;
+        }
+        void add(Runnable onCompleted) {
+            if( onCompleted!=null ) {
+                runnables.add(onCompleted);
+            }
+        }
+        void onCanceled() {
+            trace("CANCELING.onCanceled");
+            remaining--;
+            if( remaining!=0 ) {
+                return;
+            }
+            try {
+                channel.close();
+            } catch (IOException ignore) {
+            }
+            socketState = new CANCELED(dispose);
+            for (Runnable runnable : runnables) {
+                runnable.run();
+            }
+            if (dispose) {
+                dispose();
+            }
+        }
     }
 
-    enum TransportState {
-        CREATED,
-        RUNNING,
-        DISPOSED
+    class CANCELED extends SocketState {
+        private boolean disposed;
+
+        public CANCELED(boolean disposed) {
+            this.disposed=disposed;
+        }
+
+        void onStop(Runnable onCompleted) {
+            trace("CANCELED.onStop");
+            if( !disposed ) {
+                disposed = true;
+                dispose();
+            }
+        }
     }
 
     protected URI remoteLocation;
@@ -73,8 +171,7 @@ public class TcpTransport implements Tra
 
     private SocketChannel channel;
 
-    private SocketState socketState = DISCONNECTED;
-    private TransportState transportState = CREATED;
+    private SocketState socketState = new DISCONNECTED();
 
     private DispatchQueue dispatchQueue;
     private DispatchSource readSource;
@@ -88,6 +185,11 @@ public class TcpTransport implements Tra
     protected boolean useLocalHost = true;
     ByteBuffer readBuffer = ByteBuffer.allocate(bufferSize);
 
+    private final Runnable CANCEL_HANDLER = new Runnable() {
+        public void run() {
+            socketState.onCanceled();
+        }
+    };
 
     static final class OneWay {
         final Object command;
@@ -99,16 +201,28 @@ public class TcpTransport implements Tra
         }
     }
 
-    public void connected(SocketChannel channel) {
+    public void connected(SocketChannel channel) throws IOException {
         this.channel = channel;
+        this.channel.configureBlocking(false);
         this.remoteAddress = channel.socket().getRemoteSocketAddress().toString();
-        this.socketState = CONNECTED;
+        this.socketState = new CONNECTED();
     }
 
     public void connecting(URI remoteLocation, URI localLocation) throws IOException {
+        this.channel = SocketChannel.open();
+        this.channel.configureBlocking(false);
         this.remoteLocation = remoteLocation;
         this.localLocation = localLocation;
-        this.socketState = CONNECTING;
+
+        if (localLocation != null) {
+            InetSocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
+            channel.socket().bind(localAddress);
+        }
+
+        String host = resolveHostName(remoteLocation.getHost());
+        InetSocketAddress remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+        channel.connect(remoteAddress);
+        this.socketState = new CONNECTING();
     }
 
 
@@ -126,66 +240,51 @@ public class TcpTransport implements Tra
         }
     }
 
-    public void start() throws Exception {
-        start(null);
-    }
-    public void start(Runnable onCompleted) throws Exception {
-        if (dispatchQueue == null) {
-            throw new IllegalArgumentException("dispatchQueue is not set");
-        }
-        if (listener == null) {
-            throw new IllegalArgumentException("listener is not set");
-        }
-        if (transportState != CREATED) {
-            throw new IllegalStateException("start can only be used from the created state");
-        }
-        transportState = RUNNING;
-
-        if (socketState == CONNECTING) {
-            channel = SocketChannel.open();
-        }
-        channel.configureBlocking(false);
-        channel.socket().setSendBufferSize(bufferSize);
-        channel.socket().setReceiveBufferSize(bufferSize);
-        next_outbound_buffer = new DataByteArrayOutputStream(bufferSize);
-        outbound_buffer = ByteBuffer.allocate(0);
-
-        if (socketState == CONNECTING) {
-
-            if (localLocation != null) {
-                InetSocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
-                channel.socket().bind(localAddress);
-            }
-
-            String host = resolveHostName(remoteLocation.getHost());
-            InetSocketAddress remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
-            channel.connect(remoteAddress);
-
-            final DispatchSource connectSource = Dispatch.createSource(channel, SelectionKey.OP_CONNECT, dispatchQueue);
-            connectSource.setEventHandler(new Runnable() {
-                public void run() {
-                    if (transportState == RUNNING) {
+    public void _start(Runnable onCompleted) {
+        try {
+            if (socketState.is(CONNECTING.class) ) {
+                trace("connecting...");
+                // this allows the connect to complete..
+                readSource = Dispatch.createSource(channel, SelectionKey.OP_CONNECT, dispatchQueue);
+                readSource.setEventHandler(new Runnable() {
+                    public void run() {
+                        if (getServiceState() != STARTED) {
+                            return;
+                        }
                         try {
-                            socketState = CONNECTED;
+                            trace("connected.");
                             channel.finishConnect();
-                            connectSource.release();
-                            fireConnected();
+                            readSource.setCancelHandler(null);
+                            readSource.release();
+                            readSource=null;
+                            socketState = new CONNECTED();
+                            onConnected();
                         } catch (IOException e) {
                             onTransportFailure(e);
                         }
                     }
-                }
-            });
-            connectSource.resume();
-        } else {
-            fireConnected();
-        }
-        if( onCompleted!=null ) {
-            dispatchQueue.execute(onCompleted);
+                });
+                readSource.setCancelHandler(CANCEL_HANDLER);
+                readSource.resume();
+            } else if (socketState.is(CONNECTED.class) ) {
+                trace("was connected.");
+                onConnected();
+            } else {
+                System.err.println("cannot be started.  socket state is: "+socketState); 
+            }
+        } catch (IOException e) {
+            onTransportFailure(e);
+        } finally {
+            if( onCompleted!=null ) {
+                onCompleted.run();
+            }
         }
-
     }
 
+    public void _stop(final Runnable onCompleted) {
+        trace("stopping.. at state: "+socketState);
+        socketState.onStop(onCompleted);
+    }
 
     protected String resolveHostName(String host) throws UnknownHostException {
         String localName = InetAddress.getLocalHost().getHostName();
@@ -197,15 +296,20 @@ public class TcpTransport implements Tra
         return host;
     }
 
-    private void fireConnected() {
+    private void onConnected() throws SocketException {
 
-        try {
-            channel.socket().setSendBufferSize(bufferSize);
-            channel.socket().setReceiveBufferSize(bufferSize);
-        } catch (SocketException e) {
-        }
+        channel.socket().setSendBufferSize(bufferSize);
+        channel.socket().setReceiveBufferSize(bufferSize);
+
+        next_outbound_buffer = new DataByteArrayOutputStream(bufferSize);
+        outbound_buffer = ByteBuffer.allocate(0);
 
         readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue);
+        writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue);
+
+        readSource.setCancelHandler(CANCEL_HANDLER);
+        writeSource.setCancelHandler(CANCEL_HANDLER);
+
         readSource.setEventHandler(new Runnable() {
             public void run() {
                 try {
@@ -215,18 +319,9 @@ public class TcpTransport implements Tra
                 }
             }
         });
-        readSource.setCancelHandler(new Runnable() {
-            public void run() {
-                trace("Read canceled");
-                writeSource.cancel();
-                trace("Canceling write");
-            }
-        });
-
-        writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue);
         writeSource.setEventHandler(new Runnable() {
             public void run() {
-                if (transportState == RUNNING) {
+                if (getServiceState() == STARTED) {
                     // once the outbound is drained.. we can suspend getting
                     // write events.
                     if (drainOutbound()) {
@@ -235,50 +330,15 @@ public class TcpTransport implements Tra
                 }
             }
         });
-        writeSource.setCancelHandler(new Runnable() {
-            public void run() {
-                trace("Write canceled");
-                writeSource.cancel();
-                trace("Disposeing");
-                dispose();
-            }
-        });
 
         remoteAddress = channel.socket().getRemoteSocketAddress().toString();
         listener.onTransportConnected();
     }
 
 
-    public void stop() throws Exception {
-        stop(null);
-    }
-    public void stop(final Runnable onCompleted) throws Exception {
-        if (transportState != RUNNING) {
-            throw new IllegalStateException("stop can only be used from the started state but was "+transportState);
-        }
-        trace("Canceling read");
-        transportState = DISPOSED;
-        writeSource.setDisposer(new Runnable(){
-            public void run() {
-                trace("running callback: "+onCompleted);
-                if( onCompleted!=null ) {
-                    onCompleted.run();
-                }
-            }
-        });
-        readSource.cancel();
-    }
 
     private void dispose() {
 
-        assert dispatchQueue!=null;
-        assert Dispatch.getCurrentQueue() == dispatchQueue;
-
-        try {
-            channel.close();
-        } catch (IOException ignore) {
-        }
-        listener.onTransportDisconnected();
 //        OneWay oneWay = outbound.poll();
 //        while (oneWay != null) {
 //            if (oneWay.retained != null) {
@@ -295,12 +355,8 @@ public class TcpTransport implements Tra
     }
 
     public void onTransportFailure(IOException error) {
-        if( socketState == CONNECTED ) {
-            socketState = DISCONNECTED;
-            listener.onTransportFailure(error);
-            readSource.cancel();
-            writeSource.cancel();
-        }
+        listener.onTransportFailure(error);
+        socketState.onCanceled();
     }
 
 
@@ -311,10 +367,10 @@ public class TcpTransport implements Tra
     public void oneway(Object command, Retained retained) {
         assert Dispatch.getCurrentQueue() == dispatchQueue;
         try {
-            if (socketState != CONNECTED) {
+            if (!socketState.is(CONNECTED.class)) {
                 throw new IOException("Not connected.");
             }
-            if (transportState != RUNNING) {
+            if (getServiceState() != STARTED) {
                 throw new IOException("Not running.");
             }
         } catch (IOException e) {
@@ -348,7 +404,7 @@ public class TcpTransport implements Tra
         assert Dispatch.getCurrentQueue() == dispatchQueue;
         try {
 
-            while (socketState == CONNECTED) {
+            while (socketState.is(CONNECTED.class) ) {
 
                 // if we have a pending write that is being sent over the socket...
                 if (outbound_buffer.remaining()!=0) {
@@ -394,7 +450,7 @@ public class TcpTransport implements Tra
     }
 
     private void drainInbound() throws IOException {
-        if (transportState == DISPOSED || readSource.isSuspended()) {
+        if (getServiceState() == STARTED || readSource.isSuspended()) {
             return;
         }
         while (true) {
@@ -436,7 +492,7 @@ public class TcpTransport implements Tra
                 listener.onTransportCommand(command);
 
                 // the transport may be suspended after processing a command.
-                if (transportState == DISPOSED || readSource.isSuspended()) {
+                if (getServiceState() == STOPPED || readSource.isSuspended()) {
                     return;
                 }
             }
@@ -458,7 +514,7 @@ public class TcpTransport implements Tra
 
     private boolean assertConnected() {
         try {
-            if (socketState != CONNECTED) {
+            if ( !isConnected() ) {
                 throw new IOException("Not connected.");
             }
             return true;
@@ -502,11 +558,11 @@ public class TcpTransport implements Tra
     }
 
     public boolean isConnected() {
-        return socketState == CONNECTED;
+        return socketState.is(CONNECTED.class);
     }
 
     public boolean isDisposed() {
-        return transportState == DISPOSED;
+        return getServiceState() == STOPPED;
     }
 
     public boolean isFaultTolerant() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml Wed Jul  7 03:51:25 2010
@@ -46,6 +46,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.fusesource.hawtbuf</groupId>
+      <artifactId>hawtbuf-core</artifactId>
+      <version>${hawtbuf-version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.fusesource.hawtdispatch</groupId>
       <artifactId>hawtdispatch</artifactId>
       <version>${hawtdispatch-version}</version>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml Wed Jul  7 03:51:25 2010
@@ -47,6 +47,14 @@
       <groupId>org.fusesource.hawtbuf</groupId>
       <artifactId>hawtbuf-core</artifactId>
       <version>${hawtbuf-version}</version>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
+      <groupId>org.fusesource.hawtdispatch</groupId>
+      <artifactId>hawtdispatch</artifactId>
+      <version>${hawtdispatch-version}</version>
+      <optional>true</optional>
     </dependency>
 
     <dependency>

Added: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java?rev=961089&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java Wed Jul  7 03:51:25 2010
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo;
+
+import org.apache.activemq.Service;
+import org.fusesource.hawtdispatch.DispatchQueue;
+
+import java.util.LinkedList;
+
+/**
+ * <p>
+ * The BaseService provides helpers for dealing async service state.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public abstract class BaseService implements Service {
+
+    static class State {
+        public String toString() {
+            return getClass().getSimpleName();
+        }
+    }
+
+    static class CallbackSupport extends State {
+        LinkedList<Runnable> callbacks = new LinkedList<Runnable>();
+
+        void add(Runnable r) {
+            if (r != null) {
+                callbacks.add(r);
+            }
+        }
+
+        void done() {
+            for (Runnable callback : callbacks) {
+                callback.run();
+            }
+        }
+    }
+
+    public static final State CREATED = new State();
+    public static class STARTING extends CallbackSupport {
+    }
+    public static final State STARTED = new State();
+    public static class STOPPING extends CallbackSupport {
+    }
+
+    public static final State STOPPED = new State();
+
+
+    protected State _serviceState = CREATED;
+
+    final public void start() {
+        start(null);
+    }
+
+    final public void stop() {
+        stop(null);
+    }
+
+    final public void start(final Runnable onCompleted) {
+        getDispatchQueue().execute(new Runnable() {
+            public void run() {
+                if (_serviceState == CREATED ||
+                        _serviceState == STOPPED) {
+                    final STARTING state = new STARTING();
+                    state.add(onCompleted);
+                    _serviceState = state;
+                    _start(new Runnable() {
+                        public void run() {
+                            _serviceState = STARTED;
+                            state.done();
+                        }
+                    });
+                } else if (_serviceState instanceof STARTING) {
+                    ((STARTING) _serviceState).add(onCompleted);
+                } else if (_serviceState == STARTED) {
+                    if (onCompleted != null) {
+                        onCompleted.run();
+                    }
+                } else {
+                    if (onCompleted != null) {
+                        onCompleted.run();
+                    }
+                    error("start should not be called from state: " + _serviceState);
+                }
+            }
+        });
+    }
+
+    final public void stop(final Runnable onCompleted) {
+        getDispatchQueue().execute(new Runnable() {
+            public void run() {
+                if (_serviceState == STARTED) {
+                    final STOPPING state = new STOPPING();
+                    state.add(onCompleted);
+                    _serviceState = state;
+                    _stop(new Runnable() {
+                        public void run() {
+                            _serviceState = STOPPED;
+                            state.done();
+                        }
+                    });
+                } else if (_serviceState instanceof STOPPING) {
+                    ((STARTING) _serviceState).add(onCompleted);
+                } else if (_serviceState == STOPPED) {
+                    if (onCompleted != null) {
+                        onCompleted.run();
+                    }
+                } else {
+                    if (onCompleted != null) {
+                        onCompleted.run();
+                    }
+                    error("stop should not be called from state: " + _serviceState);
+                }
+            }
+        });
+    }
+
+    private void error(String msg) {
+        try {
+            throw new AssertionError(msg);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    protected State getServiceState() {
+        return _serviceState;
+    }
+
+    abstract protected DispatchQueue getDispatchQueue();
+
+    abstract protected void _start(Runnable onCompleted);
+
+    abstract protected void _stop(Runnable onCompleted);
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/pom.xml?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/pom.xml Wed Jul  7 03:51:25 2010
@@ -89,7 +89,6 @@
     <xbean-version>3.4</xbean-version>
     <felix-version>1.0.0</felix-version>
 
-    <activemq-protobuf-version>1.1-SNAPSHOT</activemq-protobuf-version>
     <hawtdispatch-version>1.0-SNAPSHOT</hawtdispatch-version>
     <hawtdb-version>1.1-SNAPSHOT</hawtdb-version>
     <hawtbuf-version>1.0-SNAPSHOT</hawtbuf-version>
@@ -245,11 +244,6 @@
           <version>1.0</version>
         </plugin>
         <plugin>
-          <groupId>org.fusesource.hawtbuf.proto</groupId>
-          <artifactId>activemq-protobuf</artifactId>
-          <version>1.1-SNAPSHOT</version>
-        </plugin>
-        <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-antrun-plugin</artifactId>
           <version>1.1</version>