You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:51:43 UTC
svn commit: r961090 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/
activemq-stomp/src/main/scala/org/apache/activemq...
Author: chirino
Date: Wed Jul 7 03:51:43 2010
New Revision: 961090
URL: http://svn.apache.org/viewvc?rev=961090&view=rev
Log:
pick up recent changes in hawtdispatch.. reduced logging verbosity.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala Wed Jul 7 03:51:43 2010
@@ -90,7 +90,7 @@ trait BaseService extends Service {
done
error("start should not be called from state: "+state);
}
- } ->: dispatchQueue
+ } |>>: dispatchQueue
final def stop(onCompleted:Runnable) = ^{
def done = {
@@ -115,9 +115,9 @@ trait BaseService extends Service {
done
error("stop should not be called from state: "+state);
}
- } ->: dispatchQueue
+ } |>>: dispatchQueue
protected def _start(onCompleted:Runnable)
protected def _stop(onCompleted:Runnable)
-}
\ No newline at end of file
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 03:51:43 2010
@@ -198,50 +198,50 @@ class Broker() extends Service with Disp
if( clientConnections.remove(connection) ) {
connection.dispatchQueue.release
}
- } ->: dispatchQueue
+ } >>: dispatchQueue
def removeConnectUri(uri: String): Unit = ^ {
connectUris.remove(uri)
- } ->: dispatchQueue
+ } >>: dispatchQueue
def getVirtualHost(name: AsciiBuffer, cb: (VirtualHost) => Unit) = callback(cb) {
virtualHosts.get(name)
- } ->: dispatchQueue
+ } >>: dispatchQueue
def getConnectUris(cb: (ArrayList[String]) => Unit) = callback(cb) {
new ArrayList(connectUris)
- } ->: dispatchQueue
+ } >>: dispatchQueue
def getDefaultVirtualHost(cb: (VirtualHost) => Unit) = callback(cb) {
defaultVirtualHost
- } ->: dispatchQueue
+ } >>: dispatchQueue
def addVirtualHost(host: VirtualHost) = ^ {
Broker.this.addVirtualHost(host)
- } ->: dispatchQueue
+ } >>: dispatchQueue
- def getState(cb: (String) => Unit) = callback(cb) {state} ->: dispatchQueue
+ def getState(cb: (String) => Unit) = callback(cb) {state} >>: dispatchQueue
def addConnectUri(uri: String) = ^ {
connectUris.add(uri)
- } ->: dispatchQueue
+ } >>: dispatchQueue
def getName(cb: (String) => Unit) = callback(cb) {
name;
- } ->: dispatchQueue
+ } >>: dispatchQueue
def getVirtualHosts(cb: (ArrayList[VirtualHost]) => Unit) = callback(cb) {
new ArrayList[VirtualHost](virtualHosts.values)
- } ->: dispatchQueue
+ } >>: dispatchQueue
def getTransportServers(cb: (ArrayList[TransportServer]) => Unit) = callback(cb) {
new ArrayList[TransportServer](transportServers)
- } ->: dispatchQueue
+ } >>: dispatchQueue
def start(onCompleted:Runnable) = ^ {
_start(onCompleted)
- } ->: dispatchQueue
+ } >>: dispatchQueue
def _start(onCompleted:Runnable) = {
if (state == CONFIGURATION) {
@@ -314,7 +314,7 @@ class Broker() extends Service with Disp
}
}
- } ->: dispatchQueue
+ } >>: dispatchQueue
}
@@ -389,7 +389,7 @@ class Queue(val destination:Destination)
def deliver(value:Delivery):Unit = {
val delivery = Delivery(value)
delivery.setDisposer(^{
- ^{ completed(value) } ->:queue
+ ^{ completed(value) } >>:queue
})
consumer.deliver(delivery);
delivery.release
@@ -415,7 +415,7 @@ class Queue(val destination:Destination)
readyConsumers.addLast(cs)
}
drain_delivery_buffer
- } ->: queue
+ } >>: queue
def unbind(consumers:List[DeliveryConsumer]) = releasing(consumers) {
for ( consumer <- consumers ) {
@@ -428,7 +428,7 @@ class Queue(val destination:Destination)
case None=>
}
}
- } ->: queue
+ } >>: queue
def disconnected() = throw new RuntimeException("unsupported")
@@ -471,7 +471,7 @@ class Queue(val destination:Destination)
//
// def deliver(delivery:Delivery) = using(delivery) {
// deliveryQueue.send(delivery)
-// } ->: queue
+// } >>: queue
//
// def close = {
// release
@@ -601,4 +601,4 @@ class XQueue(val destination:Destination
def getDestination() = destination
def shutdown = {}
-}
\ No newline at end of file
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala Wed Jul 7 03:51:43 2010
@@ -50,7 +50,7 @@ class CompletionTracker(val name:String,
^ {
assert(_callback==null)
tasks.add(rc)
- } ->: queue
+ } >>: queue
return rc
}
@@ -59,7 +59,7 @@ class CompletionTracker(val name:String,
^ {
_callback = handler _
checkDone()
- } ->: queue
+ } >>: queue
def displayStatus = {
if( _callback!=null ) {
@@ -76,12 +76,12 @@ class CompletionTracker(val name:String,
if( tasks.remove(r) ) {
checkDone()
}
- } ->: queue
+ } >>: queue
private def checkDone() = {
if( tasks.isEmpty && _callback!=null ) {
trace("executing callback for %s", name)
- _callback ->: queue
+ _callback >>: queue
_callback = null
queue.release
} else {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:51:43 2010
@@ -548,6 +548,7 @@ class DeliverySessionManager(val sink:De
});
credit_adder.resume
source.retain
+ var closed = false
private var credits = 0;
@@ -557,25 +558,28 @@ class DeliverySessionManager(val sink:De
def close = {
credit_adder.release
source.release
+ closed=true
}
override def full = credits <= 0
override protected def send_to_delivery_buffer(value:Delivery) = {
- var delivery = Delivery(value)
- credit_adder.retain
- delivery.setDisposer(^{
- // This is called from the server/consumer thread
- credit_adder.merge(delivery.size);
- credit_adder.release
- })
- internal_credit(-delivery.size)
- source.merge(delivery)
+ if( !closed ) {
+ var delivery = Delivery(value)
+ credit_adder.retain
+ delivery.setDisposer(^{
+ // This is called from the server/consumer thread
+ credit_adder.merge(delivery.size);
+ credit_adder.release
+ })
+ internal_credit(-delivery.size)
+ source.merge(delivery)
+ }
}
def internal_credit(value:Int) = {
credits += value;
- if( credits <= 0 ) {
+ if( closed || credits <= 0 ) {
credits = 0
} else {
drainOverflow
@@ -585,7 +589,7 @@ class DeliverySessionManager(val sink:De
///////////////////////////////////////////////////
// These methods get called from the server/consumer thread...
///////////////////////////////////////////////////
- def credit(value:Int) = ^{ internal_credit(value) } ->: producer_queue
+ def credit(value:Int) = ^{ internal_credit(value) } >>: producer_queue
def drain(callback:Runnable) = {
credits = 0
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul 7 03:51:43 2010
@@ -156,13 +156,13 @@ class Router(var queue:DispatchQueue) ex
def bind(destination:Destination, targets:List[DeliveryConsumer]) = retaining(targets) {
get(destination).on_bind(targets)
- } ->: queue
+ } >>: queue
def unbind(destination:Destination, targets:List[DeliveryConsumer]) = releasing(targets) {
if( get(destination).on_unbind(targets) ) {
destinations.remove(destination)
}
- } ->: queue
+ } >>: queue
def connect(destination:Destination, routeQueue:DispatchQueue, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
val route = new DeliveryProducerRoute(destination, routeQueue, producer) {
@@ -172,7 +172,7 @@ class Router(var queue:DispatchQueue) ex
}
^ {
get(destination).on_connect(route)
- } ->: queue
+ } >>: queue
}
def isTopic(destination:Destination) = destination.getDomain == TOPIC_DOMAIN
@@ -180,7 +180,7 @@ class Router(var queue:DispatchQueue) ex
def disconnect(route:DeliveryProducerRoute) = releasing(route) {
get(route.destination).on_disconnect(route)
- } ->: queue
+ } >>: queue
def each(proc:(Destination, DestinationNode)=>Unit) = {
@@ -227,11 +227,11 @@ class DeliveryProducerRoute(val destinat
def connected(targets:List[DeliveryConsumer]) = retaining(targets) {
internal_bind(targets)
on_connected
- } ->: queue
+ } >>: queue
def bind(targets:List[DeliveryConsumer]) = retaining(targets) {
internal_bind(targets)
- } ->: queue
+ } >>: queue
private def internal_bind(values:List[DeliveryConsumer]) = {
values.foreach{ x=>
@@ -249,7 +249,7 @@ class DeliveryProducerRoute(val destinat
}
rc
}
- } ->: queue
+ } >>: queue
def disconnected() = ^ {
this.targets.foreach { x=>
@@ -257,7 +257,7 @@ class DeliveryProducerRoute(val destinat
x.close
x.consumer.release
}
- } ->: queue
+ } >>: queue
protected def on_connected = {}
protected def on_disconnected = {}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul 7 03:51:43 2010
@@ -35,8 +35,8 @@ import org.scalatest._
import _root_.org.fusesource.hawtbuf._
object BaseBrokerPerfSupport {
- var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "1"))
- var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "1000"))
+ var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3"))
+ var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "3000"))
var IO_WORK_AMOUNT = 0
var FANIN_COUNT = 10
var FANOUT_COUNT = 10
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:51:43 2010
@@ -125,7 +125,7 @@ class StompProtocolHandler extends Proto
host.router.unbind(consumer.destination, consumer::Nil)
consumer=null
}
- info("stomp protocol resources released")
+ trace("stomp protocol resources released")
}
}
@@ -199,7 +199,7 @@ class StompProtocolHandler extends Proto
// read_source.setTargetQueue(queue)
// }
- } ->: queue
+ } >>: queue
}
// don't process frames until we are connected..
@@ -280,12 +280,13 @@ class StompProtocolHandler extends Proto
connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)), null)
^ {
connection.stop()
- } ->: queue
+ } >>: queue
}
}
override def onTransportFailure(error: IOException) = {
if( !connection.stopped ) {
+ connection.transport.suspendRead
info(error, "Shutting connection down due to: %s", error)
super.onTransportFailure(error);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:51:43 2010
@@ -44,9 +44,6 @@ import java.util.Map;
*/
public class TcpTransport extends BaseService implements Transport {
- static {
- System.out.println(TcpTransport.class.getClassLoader().getResource("log4j.properties"));
- }
private static final Log LOG = LogFactory.getLog(TcpTransport.class);
private Map<String, Object> socketOptions;
@@ -450,7 +447,7 @@ public class TcpTransport extends BaseSe
}
private void drainInbound() throws IOException {
- if (getServiceState() == STARTED || readSource.isSuspended()) {
+ if (!getServiceState().isStarted() || readSource.isSuspended()) {
return;
}
while (true) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java Wed Jul 7 03:51:43 2010
@@ -30,10 +30,13 @@ import java.util.LinkedList;
*/
public abstract class BaseService implements Service {
- static class State {
+ public static class State {
public String toString() {
return getClass().getSimpleName();
}
+ public boolean isStarted() {
+ return false;
+ }
}
static class CallbackSupport extends State {
@@ -55,7 +58,11 @@ public abstract class BaseService implem
public static final State CREATED = new State();
public static class STARTING extends CallbackSupport {
}
- public static final State STARTED = new State();
+ public static final State STARTED = new State() {
+ public boolean isStarted() {
+ return true;
+ }
+ };
public static class STOPPING extends CallbackSupport {
}