You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:51:26 UTC
svn commit: r961089 - in /activemq/sandbox/activemq-apollo-actor: ./
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-selector/ activemq-store/ activemq-tcp/
activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/ active...
Author: chirino
Date: Wed Jul 7 03:51:25 2010
New Revision: 961089
URL: http://svn.apache.org/viewvc?rev=961089&view=rev
Log:
tcp transport has much more improved state tracking an cleanup.
Added:
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/sandbox/activemq-apollo-actor/activemq-selector/pom.xml
activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
activemq/sandbox/activemq-apollo-actor/activemq-tcp/pom.xml
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml
activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml
activemq/sandbox/activemq-apollo-actor/pom.xml
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala Wed Jul 7 03:51:25 2010
@@ -39,18 +39,18 @@ trait BaseService extends Service {
def done = callbacks.foreach(_.run)
}
- case class CREATED extends State
- case class STARTING extends State with CallbackSupport
- case class STARTED extends State
- case class STOPPING extends State with CallbackSupport
- case class STOPPED extends State
+ object CREATED extends State
+ class STARTING extends State with CallbackSupport
+ object STARTED extends State
+ class STOPPING extends State with CallbackSupport
+ object STOPPED extends State
val dispatchQueue:DispatchQueue
final def start() = start(null)
final def stop() = stop(null)
- protected var _serviceState:State = CREATED()
+ protected var _serviceState:State = CREATED
protected def serviceState = _serviceState
private def error(msg:String) {
@@ -64,11 +64,11 @@ trait BaseService extends Service {
final def start(onCompleted:Runnable) = ^{
def do_start = {
- val state = STARTING()
+ val state = new STARTING()
state << onCompleted
_serviceState = state
_start(^{
- _serviceState = STARTED()
+ _serviceState = STARTED
state.done
})
}
@@ -78,13 +78,13 @@ trait BaseService extends Service {
}
}
_serviceState match {
- case x:CREATED =>
+ case CREATED =>
do_start
- case x:STOPPED =>
+ case STOPPED =>
do_start
case state:STARTING =>
state << onCompleted
- case state:STARTED =>
+ case STARTED =>
done
case state =>
done
@@ -99,17 +99,17 @@ trait BaseService extends Service {
}
}
_serviceState match {
- case x:STARTED =>
- val state = STOPPING()
+ case STARTED =>
+ val state = new STOPPING
state << onCompleted
_serviceState = state
_stop(^{
- _serviceState = STOPPED()
+ _serviceState = STOPPED
state.done
})
case state:STOPPING =>
state << onCompleted
- case state:STOPPED =>
+ case STOPPED =>
done
case state =>
done
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 03:51:25 2010
@@ -48,7 +48,8 @@ abstract class Connection() extends Tran
val dispatchQueue = createQueue(id)
def stopped = serviceState match {
- case STOPPED() | STOPPING() => true
+ case STOPPED => true
+ case x:STOPPING => true
case _ => false
}
@@ -102,11 +103,9 @@ class BrokerConnection(val broker: Broke
}
override protected def _stop(onCompleted:Runnable) = {
- if( !stopped ) {
- broker.runtime.stopped(this)
- broker.dispatchQueue.release
- super._stop(onCompleted)
- }
+ broker.runtime.stopped(this)
+ broker.dispatchQueue.release
+ super._stop(onCompleted)
}
override def onTransportConnected() = protocolHandler.onTransportConnected
Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/pom.xml?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/pom.xml Wed Jul 7 03:51:25 2010
@@ -38,6 +38,12 @@
<artifactId>activemq-util</artifactId>
<version>6.0-SNAPSHOT</version>
</dependency>
+
+ <dependency>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf-core</artifactId>
+ <version>${hawtbuf-version}</version>
+ </dependency>
<!-- TODO: try to remove this dependency -->
<dependency>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml Wed Jul 7 03:51:25 2010
@@ -38,6 +38,11 @@
<artifactId>activemq-util</artifactId>
<version>6.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf-core</artifactId>
+ <version>${hawtbuf-version}</version>
+ </dependency>
<!-- Testing Dependencies -->
<dependency>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/pom.xml?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/pom.xml Wed Jul 7 03:51:25 2010
@@ -38,7 +38,7 @@
<artifactId>activemq-transport</artifactId>
<version>6.0-SNAPSHOT</version>
</dependency>
-
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:51:25 2010
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.tcp;
+import org.apache.activemq.apollo.BaseService;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.wireformat.WireFormat;
@@ -36,15 +37,12 @@ import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.Map;
-import static org.apache.activemq.transport.tcp.TcpTransport.SocketState.*;
-import static org.apache.activemq.transport.tcp.TcpTransport.TransportState.*;
-
/**
* An implementation of the {@link Transport} interface using raw tcp/ip
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class TcpTransport implements Transport {
+public class TcpTransport extends BaseService implements Transport {
static {
System.out.println(TcpTransport.class.getClassLoader().getResource("log4j.properties"));
@@ -53,16 +51,116 @@ public class TcpTransport implements Tra
private Map<String, Object> socketOptions;
- enum SocketState {
- CONNECTING,
- CONNECTED,
- DISCONNECTED
+ abstract static class SocketState {
+ void onStop(Runnable onCompleted) {
+ }
+ void onCanceled() {
+ }
+ boolean is(Class<? extends SocketState> clazz) {
+ return getClass()==clazz;
+ }
+ }
+
+ static class DISCONNECTED extends SocketState{}
+
+ class CONNECTING extends SocketState{
+ void onStop(Runnable onCompleted) {
+ trace("CONNECTING.onStop");
+ CANCELING state = new CANCELING();
+ socketState = state;
+ state.onStop(onCompleted);
+ }
+ void onCanceled() {
+ trace("CONNECTING.onCanceled");
+ CANCELING state = new CANCELING();
+ socketState = state;
+ state.onCanceled();
+ }
+ }
+
+ class CONNECTED extends SocketState {
+ void onStop(Runnable onCompleted) {
+ trace("CONNECTED.onStop");
+ CANCELING state = new CANCELING();
+ socketState = state;
+ state.add(createDisconnectTask());
+ state.onStop(onCompleted);
+ }
+ void onCanceled() {
+ trace("CONNECTED.onCanceled");
+ CANCELING state = new CANCELING();
+ socketState = state;
+ state.add(createDisconnectTask());
+ state.onCanceled();
+ }
+ Runnable createDisconnectTask() {
+ return new Runnable(){
+ public void run() {
+ listener.onTransportDisconnected();
+ }
+ };
+ }
+ }
+
+ class CANCELING extends SocketState {
+ private LinkedList<Runnable> runnables = new LinkedList<Runnable>();
+ private int remaining;
+ private boolean dispose;
+
+ public CANCELING() {
+ if( readSource!=null ) {
+ remaining++;
+ readSource.cancel();
+ }
+ if( writeSource!=null ) {
+ remaining++;
+ writeSource.cancel();
+ }
+ }
+ void onStop(Runnable onCompleted) {
+ trace("CANCELING.onCompleted");
+ add(onCompleted);
+ dispose = true;
+ }
+ void add(Runnable onCompleted) {
+ if( onCompleted!=null ) {
+ runnables.add(onCompleted);
+ }
+ }
+ void onCanceled() {
+ trace("CANCELING.onCanceled");
+ remaining--;
+ if( remaining!=0 ) {
+ return;
+ }
+ try {
+ channel.close();
+ } catch (IOException ignore) {
+ }
+ socketState = new CANCELED(dispose);
+ for (Runnable runnable : runnables) {
+ runnable.run();
+ }
+ if (dispose) {
+ dispose();
+ }
+ }
}
- enum TransportState {
- CREATED,
- RUNNING,
- DISPOSED
+ class CANCELED extends SocketState {
+ private boolean disposed;
+
+ public CANCELED(boolean disposed) {
+ this.disposed=disposed;
+ }
+
+ void onStop(Runnable onCompleted) {
+ trace("CANCELED.onStop");
+ if( !disposed ) {
+ disposed = true;
+ dispose();
+ }
+ }
}
protected URI remoteLocation;
@@ -73,8 +171,7 @@ public class TcpTransport implements Tra
private SocketChannel channel;
- private SocketState socketState = DISCONNECTED;
- private TransportState transportState = CREATED;
+ private SocketState socketState = new DISCONNECTED();
private DispatchQueue dispatchQueue;
private DispatchSource readSource;
@@ -88,6 +185,11 @@ public class TcpTransport implements Tra
protected boolean useLocalHost = true;
ByteBuffer readBuffer = ByteBuffer.allocate(bufferSize);
+ private final Runnable CANCEL_HANDLER = new Runnable() {
+ public void run() {
+ socketState.onCanceled();
+ }
+ };
static final class OneWay {
final Object command;
@@ -99,16 +201,28 @@ public class TcpTransport implements Tra
}
}
- public void connected(SocketChannel channel) {
+ public void connected(SocketChannel channel) throws IOException {
this.channel = channel;
+ this.channel.configureBlocking(false);
this.remoteAddress = channel.socket().getRemoteSocketAddress().toString();
- this.socketState = CONNECTED;
+ this.socketState = new CONNECTED();
}
public void connecting(URI remoteLocation, URI localLocation) throws IOException {
+ this.channel = SocketChannel.open();
+ this.channel.configureBlocking(false);
this.remoteLocation = remoteLocation;
this.localLocation = localLocation;
- this.socketState = CONNECTING;
+
+ if (localLocation != null) {
+ InetSocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
+ channel.socket().bind(localAddress);
+ }
+
+ String host = resolveHostName(remoteLocation.getHost());
+ InetSocketAddress remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+ channel.connect(remoteAddress);
+ this.socketState = new CONNECTING();
}
@@ -126,66 +240,51 @@ public class TcpTransport implements Tra
}
}
- public void start() throws Exception {
- start(null);
- }
- public void start(Runnable onCompleted) throws Exception {
- if (dispatchQueue == null) {
- throw new IllegalArgumentException("dispatchQueue is not set");
- }
- if (listener == null) {
- throw new IllegalArgumentException("listener is not set");
- }
- if (transportState != CREATED) {
- throw new IllegalStateException("start can only be used from the created state");
- }
- transportState = RUNNING;
-
- if (socketState == CONNECTING) {
- channel = SocketChannel.open();
- }
- channel.configureBlocking(false);
- channel.socket().setSendBufferSize(bufferSize);
- channel.socket().setReceiveBufferSize(bufferSize);
- next_outbound_buffer = new DataByteArrayOutputStream(bufferSize);
- outbound_buffer = ByteBuffer.allocate(0);
-
- if (socketState == CONNECTING) {
-
- if (localLocation != null) {
- InetSocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
- channel.socket().bind(localAddress);
- }
-
- String host = resolveHostName(remoteLocation.getHost());
- InetSocketAddress remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
- channel.connect(remoteAddress);
-
- final DispatchSource connectSource = Dispatch.createSource(channel, SelectionKey.OP_CONNECT, dispatchQueue);
- connectSource.setEventHandler(new Runnable() {
- public void run() {
- if (transportState == RUNNING) {
+ public void _start(Runnable onCompleted) {
+ try {
+ if (socketState.is(CONNECTING.class) ) {
+ trace("connecting...");
+ // this allows the connect to complete..
+ readSource = Dispatch.createSource(channel, SelectionKey.OP_CONNECT, dispatchQueue);
+ readSource.setEventHandler(new Runnable() {
+ public void run() {
+ if (getServiceState() != STARTED) {
+ return;
+ }
try {
- socketState = CONNECTED;
+ trace("connected.");
channel.finishConnect();
- connectSource.release();
- fireConnected();
+ readSource.setCancelHandler(null);
+ readSource.release();
+ readSource=null;
+ socketState = new CONNECTED();
+ onConnected();
} catch (IOException e) {
onTransportFailure(e);
}
}
- }
- });
- connectSource.resume();
- } else {
- fireConnected();
- }
- if( onCompleted!=null ) {
- dispatchQueue.execute(onCompleted);
+ });
+ readSource.setCancelHandler(CANCEL_HANDLER);
+ readSource.resume();
+ } else if (socketState.is(CONNECTED.class) ) {
+ trace("was connected.");
+ onConnected();
+ } else {
+ System.err.println("cannot be started. socket state is: "+socketState);
+ }
+ } catch (IOException e) {
+ onTransportFailure(e);
+ } finally {
+ if( onCompleted!=null ) {
+ onCompleted.run();
+ }
}
-
}
+ public void _stop(final Runnable onCompleted) {
+ trace("stopping.. at state: "+socketState);
+ socketState.onStop(onCompleted);
+ }
protected String resolveHostName(String host) throws UnknownHostException {
String localName = InetAddress.getLocalHost().getHostName();
@@ -197,15 +296,20 @@ public class TcpTransport implements Tra
return host;
}
- private void fireConnected() {
+ private void onConnected() throws SocketException {
- try {
- channel.socket().setSendBufferSize(bufferSize);
- channel.socket().setReceiveBufferSize(bufferSize);
- } catch (SocketException e) {
- }
+ channel.socket().setSendBufferSize(bufferSize);
+ channel.socket().setReceiveBufferSize(bufferSize);
+
+ next_outbound_buffer = new DataByteArrayOutputStream(bufferSize);
+ outbound_buffer = ByteBuffer.allocate(0);
readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue);
+ writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue);
+
+ readSource.setCancelHandler(CANCEL_HANDLER);
+ writeSource.setCancelHandler(CANCEL_HANDLER);
+
readSource.setEventHandler(new Runnable() {
public void run() {
try {
@@ -215,18 +319,9 @@ public class TcpTransport implements Tra
}
}
});
- readSource.setCancelHandler(new Runnable() {
- public void run() {
- trace("Read canceled");
- writeSource.cancel();
- trace("Canceling write");
- }
- });
-
- writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue);
writeSource.setEventHandler(new Runnable() {
public void run() {
- if (transportState == RUNNING) {
+ if (getServiceState() == STARTED) {
// once the outbound is drained.. we can suspend getting
// write events.
if (drainOutbound()) {
@@ -235,50 +330,15 @@ public class TcpTransport implements Tra
}
}
});
- writeSource.setCancelHandler(new Runnable() {
- public void run() {
- trace("Write canceled");
- writeSource.cancel();
- trace("Disposeing");
- dispose();
- }
- });
remoteAddress = channel.socket().getRemoteSocketAddress().toString();
listener.onTransportConnected();
}
- public void stop() throws Exception {
- stop(null);
- }
- public void stop(final Runnable onCompleted) throws Exception {
- if (transportState != RUNNING) {
- throw new IllegalStateException("stop can only be used from the started state but was "+transportState);
- }
- trace("Canceling read");
- transportState = DISPOSED;
- writeSource.setDisposer(new Runnable(){
- public void run() {
- trace("running callback: "+onCompleted);
- if( onCompleted!=null ) {
- onCompleted.run();
- }
- }
- });
- readSource.cancel();
- }
private void dispose() {
- assert dispatchQueue!=null;
- assert Dispatch.getCurrentQueue() == dispatchQueue;
-
- try {
- channel.close();
- } catch (IOException ignore) {
- }
- listener.onTransportDisconnected();
// OneWay oneWay = outbound.poll();
// while (oneWay != null) {
// if (oneWay.retained != null) {
@@ -295,12 +355,8 @@ public class TcpTransport implements Tra
}
public void onTransportFailure(IOException error) {
- if( socketState == CONNECTED ) {
- socketState = DISCONNECTED;
- listener.onTransportFailure(error);
- readSource.cancel();
- writeSource.cancel();
- }
+ listener.onTransportFailure(error);
+ socketState.onCanceled();
}
@@ -311,10 +367,10 @@ public class TcpTransport implements Tra
public void oneway(Object command, Retained retained) {
assert Dispatch.getCurrentQueue() == dispatchQueue;
try {
- if (socketState != CONNECTED) {
+ if (!socketState.is(CONNECTED.class)) {
throw new IOException("Not connected.");
}
- if (transportState != RUNNING) {
+ if (getServiceState() != STARTED) {
throw new IOException("Not running.");
}
} catch (IOException e) {
@@ -348,7 +404,7 @@ public class TcpTransport implements Tra
assert Dispatch.getCurrentQueue() == dispatchQueue;
try {
- while (socketState == CONNECTED) {
+ while (socketState.is(CONNECTED.class) ) {
// if we have a pending write that is being sent over the socket...
if (outbound_buffer.remaining()!=0) {
@@ -394,7 +450,7 @@ public class TcpTransport implements Tra
}
private void drainInbound() throws IOException {
- if (transportState == DISPOSED || readSource.isSuspended()) {
+ if (getServiceState() == STARTED || readSource.isSuspended()) {
return;
}
while (true) {
@@ -436,7 +492,7 @@ public class TcpTransport implements Tra
listener.onTransportCommand(command);
// the transport may be suspended after processing a command.
- if (transportState == DISPOSED || readSource.isSuspended()) {
+ if (getServiceState() == STOPPED || readSource.isSuspended()) {
return;
}
}
@@ -458,7 +514,7 @@ public class TcpTransport implements Tra
private boolean assertConnected() {
try {
- if (socketState != CONNECTED) {
+ if ( !isConnected() ) {
throw new IOException("Not connected.");
}
return true;
@@ -502,11 +558,11 @@ public class TcpTransport implements Tra
}
public boolean isConnected() {
- return socketState == CONNECTED;
+ return socketState.is(CONNECTED.class);
}
public boolean isDisposed() {
- return transportState == DISPOSED;
+ return getServiceState() == STOPPED;
}
public boolean isFaultTolerant() {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml Wed Jul 7 03:51:25 2010
@@ -46,6 +46,12 @@
</dependency>
<dependency>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf-core</artifactId>
+ <version>${hawtbuf-version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.fusesource.hawtdispatch</groupId>
<artifactId>hawtdispatch</artifactId>
<version>${hawtdispatch-version}</version>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml Wed Jul 7 03:51:25 2010
@@ -47,6 +47,14 @@
<groupId>org.fusesource.hawtbuf</groupId>
<artifactId>hawtbuf-core</artifactId>
<version>${hawtbuf-version}</version>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.fusesource.hawtdispatch</groupId>
+ <artifactId>hawtdispatch</artifactId>
+ <version>${hawtdispatch-version}</version>
+ <optional>true</optional>
</dependency>
<dependency>
Added: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java?rev=961089&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java Wed Jul 7 03:51:25 2010
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo;
+
+import org.apache.activemq.Service;
+import org.fusesource.hawtdispatch.DispatchQueue;
+
+import java.util.LinkedList;
+
+/**
+ * <p>
+ * The BaseService provides helpers for dealing async service state.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public abstract class BaseService implements Service {
+
+ static class State {
+ public String toString() {
+ return getClass().getSimpleName();
+ }
+ }
+
+ static class CallbackSupport extends State {
+ LinkedList<Runnable> callbacks = new LinkedList<Runnable>();
+
+ void add(Runnable r) {
+ if (r != null) {
+ callbacks.add(r);
+ }
+ }
+
+ void done() {
+ for (Runnable callback : callbacks) {
+ callback.run();
+ }
+ }
+ }
+
+ public static final State CREATED = new State();
+ public static class STARTING extends CallbackSupport {
+ }
+ public static final State STARTED = new State();
+ public static class STOPPING extends CallbackSupport {
+ }
+
+ public static final State STOPPED = new State();
+
+
+ protected State _serviceState = CREATED;
+
+ final public void start() {
+ start(null);
+ }
+
+ final public void stop() {
+ stop(null);
+ }
+
+ final public void start(final Runnable onCompleted) {
+ getDispatchQueue().execute(new Runnable() {
+ public void run() {
+ if (_serviceState == CREATED ||
+ _serviceState == STOPPED) {
+ final STARTING state = new STARTING();
+ state.add(onCompleted);
+ _serviceState = state;
+ _start(new Runnable() {
+ public void run() {
+ _serviceState = STARTED;
+ state.done();
+ }
+ });
+ } else if (_serviceState instanceof STARTING) {
+ ((STARTING) _serviceState).add(onCompleted);
+ } else if (_serviceState == STARTED) {
+ if (onCompleted != null) {
+ onCompleted.run();
+ }
+ } else {
+ if (onCompleted != null) {
+ onCompleted.run();
+ }
+ error("start should not be called from state: " + _serviceState);
+ }
+ }
+ });
+ }
+
+ final public void stop(final Runnable onCompleted) {
+ getDispatchQueue().execute(new Runnable() {
+ public void run() {
+ if (_serviceState == STARTED) {
+ final STOPPING state = new STOPPING();
+ state.add(onCompleted);
+ _serviceState = state;
+ _stop(new Runnable() {
+ public void run() {
+ _serviceState = STOPPED;
+ state.done();
+ }
+ });
+ } else if (_serviceState instanceof STOPPING) {
+ ((STARTING) _serviceState).add(onCompleted);
+ } else if (_serviceState == STOPPED) {
+ if (onCompleted != null) {
+ onCompleted.run();
+ }
+ } else {
+ if (onCompleted != null) {
+ onCompleted.run();
+ }
+ error("stop should not be called from state: " + _serviceState);
+ }
+ }
+ });
+ }
+
+ private void error(String msg) {
+ try {
+ throw new AssertionError(msg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ protected State getServiceState() {
+ return _serviceState;
+ }
+
+ abstract protected DispatchQueue getDispatchQueue();
+
+ abstract protected void _start(Runnable onCompleted);
+
+ abstract protected void _stop(Runnable onCompleted);
+
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/pom.xml?rev=961089&r1=961088&r2=961089&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/pom.xml Wed Jul 7 03:51:25 2010
@@ -89,7 +89,6 @@
<xbean-version>3.4</xbean-version>
<felix-version>1.0.0</felix-version>
- <activemq-protobuf-version>1.1-SNAPSHOT</activemq-protobuf-version>
<hawtdispatch-version>1.0-SNAPSHOT</hawtdispatch-version>
<hawtdb-version>1.1-SNAPSHOT</hawtdb-version>
<hawtbuf-version>1.0-SNAPSHOT</hawtbuf-version>
@@ -245,11 +244,6 @@
<version>1.0</version>
</plugin>
<plugin>
- <groupId>org.fusesource.hawtbuf.proto</groupId>
- <artifactId>activemq-protobuf</artifactId>
- <version>1.1-SNAPSHOT</version>
- </plugin>
- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.1</version>