You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:51:43 UTC

svn commit: r961090 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/ activemq-stomp/src/main/scala/org/apache/activemq...

Author: chirino
Date: Wed Jul  7 03:51:43 2010
New Revision: 961090

URL: http://svn.apache.org/viewvc?rev=961090&view=rev
Log:
pick up recent changes in hawtdispatch.. reduced logging verbosity.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala Wed Jul  7 03:51:43 2010
@@ -90,7 +90,7 @@ trait BaseService extends Service {
         done
         error("start should not be called from state: "+state);
     }
-  } ->: dispatchQueue
+  } |>>: dispatchQueue
 
   final def stop(onCompleted:Runnable) = ^{
     def done = {
@@ -115,9 +115,9 @@ trait BaseService extends Service {
         done
         error("stop should not be called from state: "+state);
     }
-  } ->: dispatchQueue
+  } |>>: dispatchQueue
 
   protected def _start(onCompleted:Runnable)
   protected def _stop(onCompleted:Runnable)
 
-}
\ No newline at end of file
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul  7 03:51:43 2010
@@ -198,50 +198,50 @@ class Broker() extends Service with Disp
       if( clientConnections.remove(connection) ) {
         connection.dispatchQueue.release
       }
-    } ->: dispatchQueue
+    } >>: dispatchQueue
 
     def removeConnectUri(uri: String): Unit = ^ {
       connectUris.remove(uri)
-    } ->: dispatchQueue
+    } >>: dispatchQueue
 
     def getVirtualHost(name: AsciiBuffer, cb: (VirtualHost) => Unit) = callback(cb) {
       virtualHosts.get(name)
-    } ->: dispatchQueue
+    } >>: dispatchQueue
 
     def getConnectUris(cb: (ArrayList[String]) => Unit) = callback(cb) {
       new ArrayList(connectUris)
-    } ->: dispatchQueue
+    } >>: dispatchQueue
 
 
     def getDefaultVirtualHost(cb: (VirtualHost) => Unit) = callback(cb) {
       defaultVirtualHost
-    } ->: dispatchQueue
+    } >>: dispatchQueue
 
     def addVirtualHost(host: VirtualHost) = ^ {
       Broker.this.addVirtualHost(host)
-    } ->: dispatchQueue
+    } >>: dispatchQueue
 
-    def getState(cb: (String) => Unit) = callback(cb) {state} ->: dispatchQueue
+    def getState(cb: (String) => Unit) = callback(cb) {state} >>: dispatchQueue
 
     def addConnectUri(uri: String) = ^ {
       connectUris.add(uri)
-    } ->: dispatchQueue
+    } >>: dispatchQueue
 
     def getName(cb: (String) => Unit) = callback(cb) {
       name;
-    } ->: dispatchQueue
+    } >>: dispatchQueue
 
     def getVirtualHosts(cb: (ArrayList[VirtualHost]) => Unit) = callback(cb) {
       new ArrayList[VirtualHost](virtualHosts.values)
-    } ->: dispatchQueue
+    } >>: dispatchQueue
 
     def getTransportServers(cb: (ArrayList[TransportServer]) => Unit) = callback(cb) {
       new ArrayList[TransportServer](transportServers)
-    } ->: dispatchQueue
+    } >>: dispatchQueue
 
     def start(onCompleted:Runnable) = ^ {
       _start(onCompleted)
-    } ->: dispatchQueue
+    } >>: dispatchQueue
 
     def _start(onCompleted:Runnable) = {
       if (state == CONFIGURATION) {
@@ -314,7 +314,7 @@ class Broker() extends Service with Disp
         }
 
       }
-    } ->: dispatchQueue
+    } >>: dispatchQueue
     
   }
 
@@ -389,7 +389,7 @@ class Queue(val destination:Destination)
     def deliver(value:Delivery):Unit = {
       val delivery = Delivery(value)
       delivery.setDisposer(^{
-        ^{ completed(value) } ->:queue
+        ^{ completed(value) } >>:queue
       })
       consumer.deliver(delivery);
       delivery.release
@@ -415,7 +415,7 @@ class Queue(val destination:Destination)
         readyConsumers.addLast(cs)
       }
       drain_delivery_buffer
-    } ->: queue
+    } >>: queue
 
   def unbind(consumers:List[DeliveryConsumer]) = releasing(consumers) {
       for ( consumer <- consumers ) {
@@ -428,7 +428,7 @@ class Queue(val destination:Destination)
           case None=>
         }
       }
-    } ->: queue
+    } >>: queue
 
   def disconnected() = throw new RuntimeException("unsupported")
 
@@ -471,7 +471,7 @@ class Queue(val destination:Destination)
 //
 //    def deliver(delivery:Delivery) = using(delivery) {
 //      deliveryQueue.send(delivery)
-//    } ->: queue
+//    } >>: queue
 //
 //    def close = {
 //      release
@@ -601,4 +601,4 @@ class XQueue(val destination:Destination
   def getDestination() = destination
 
   def shutdown = {}
-}
\ No newline at end of file
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala Wed Jul  7 03:51:43 2010
@@ -50,7 +50,7 @@ class CompletionTracker(val name:String,
     ^ {
       assert(_callback==null)
       tasks.add(rc)
-    }  ->: queue
+    }  >>: queue
     return rc
   }
 
@@ -59,7 +59,7 @@ class CompletionTracker(val name:String,
     ^ {
       _callback = handler _
       checkDone()
-    }  ->: queue
+    }  >>: queue
 
     def displayStatus = {
       if( _callback!=null ) {
@@ -76,12 +76,12 @@ class CompletionTracker(val name:String,
     if( tasks.remove(r) ) {
       checkDone()
     }
-  } ->: queue
+  } >>: queue
 
   private def checkDone() = {
     if( tasks.isEmpty && _callback!=null ) {
       trace("executing callback for %s", name)
-      _callback ->: queue
+      _callback >>: queue
       _callback = null
       queue.release
     } else {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 03:51:43 2010
@@ -548,6 +548,7 @@ class DeliverySessionManager(val sink:De
       });
       credit_adder.resume
       source.retain
+      var closed = false
 
       private var credits = 0;
 
@@ -557,25 +558,28 @@ class DeliverySessionManager(val sink:De
       def close = {
         credit_adder.release
         source.release
+        closed=true
       }
 
       override def full = credits <= 0
 
       override protected def send_to_delivery_buffer(value:Delivery) = {
-        var delivery = Delivery(value)
-        credit_adder.retain
-        delivery.setDisposer(^{
-          // This is called from the server/consumer thread
-          credit_adder.merge(delivery.size);
-          credit_adder.release
-        })
-        internal_credit(-delivery.size)
-        source.merge(delivery)
+        if( !closed ) {
+          var delivery = Delivery(value)
+          credit_adder.retain
+          delivery.setDisposer(^{
+            // This is called from the server/consumer thread
+            credit_adder.merge(delivery.size);
+            credit_adder.release
+          })
+          internal_credit(-delivery.size)
+          source.merge(delivery)
+        }
       }
 
       def internal_credit(value:Int) = {
         credits += value;
-        if( credits <= 0 ) {
+        if( closed || credits <= 0 ) {
           credits = 0
         } else {
           drainOverflow
@@ -585,7 +589,7 @@ class DeliverySessionManager(val sink:De
       ///////////////////////////////////////////////////
       // These methods get called from the server/consumer thread...
       ///////////////////////////////////////////////////
-      def credit(value:Int) = ^{ internal_credit(value) } ->: producer_queue
+      def credit(value:Int) = ^{ internal_credit(value) } >>: producer_queue
 
       def drain(callback:Runnable) = {
         credits = 0

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul  7 03:51:43 2010
@@ -156,13 +156,13 @@ class Router(var queue:DispatchQueue) ex
 
   def bind(destination:Destination, targets:List[DeliveryConsumer]) = retaining(targets) {
       get(destination).on_bind(targets)
-    } ->: queue
+    } >>: queue
 
   def unbind(destination:Destination, targets:List[DeliveryConsumer]) = releasing(targets) {
       if( get(destination).on_unbind(targets) ) {
         destinations.remove(destination)
       }
-    } ->: queue
+    } >>: queue
 
   def connect(destination:Destination, routeQueue:DispatchQueue, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
     val route = new DeliveryProducerRoute(destination, routeQueue, producer) {
@@ -172,7 +172,7 @@ class Router(var queue:DispatchQueue) ex
     }
     ^ {
       get(destination).on_connect(route)
-    } ->: queue
+    } >>: queue
   }
 
   def isTopic(destination:Destination) = destination.getDomain == TOPIC_DOMAIN
@@ -180,7 +180,7 @@ class Router(var queue:DispatchQueue) ex
 
   def disconnect(route:DeliveryProducerRoute) = releasing(route) {
       get(route.destination).on_disconnect(route)
-    } ->: queue
+    } >>: queue
 
 
    def each(proc:(Destination, DestinationNode)=>Unit) = {
@@ -227,11 +227,11 @@ class DeliveryProducerRoute(val destinat
   def connected(targets:List[DeliveryConsumer]) = retaining(targets) {
     internal_bind(targets)
     on_connected
-  } ->: queue
+  } >>: queue
 
   def bind(targets:List[DeliveryConsumer]) = retaining(targets) {
     internal_bind(targets)
-  } ->: queue
+  } >>: queue
 
   private def internal_bind(values:List[DeliveryConsumer]) = {
     values.foreach{ x=>
@@ -249,7 +249,7 @@ class DeliveryProducerRoute(val destinat
       }
       rc
     }
-  } ->: queue
+  } >>: queue
 
   def disconnected() = ^ {
     this.targets.foreach { x=>
@@ -257,7 +257,7 @@ class DeliveryProducerRoute(val destinat
       x.close
       x.consumer.release
     }    
-  } ->: queue
+  } >>: queue
 
   protected def on_connected = {}
   protected def on_disconnected = {}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul  7 03:51:43 2010
@@ -35,8 +35,8 @@ import org.scalatest._
 import _root_.org.fusesource.hawtbuf._
 
 object BaseBrokerPerfSupport {
-  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "1"))
-  var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "1000"))
+  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3"))
+  var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "3000"))
   var IO_WORK_AMOUNT = 0
   var FANIN_COUNT = 10
   var FANOUT_COUNT = 10

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:51:43 2010
@@ -125,7 +125,7 @@ class StompProtocolHandler extends Proto
         host.router.unbind(consumer.destination, consumer::Nil)
         consumer=null
       }
-      info("stomp protocol resources released")
+      trace("stomp protocol resources released")
     }
   }
 
@@ -199,7 +199,7 @@ class StompProtocolHandler extends Proto
 //                read_source.setTargetQueue(queue)
 //              }
 
-            } ->: queue
+            } >>: queue
           }
 
           // don't process frames until we are connected..
@@ -280,12 +280,13 @@ class StompProtocolHandler extends Proto
       connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)), null)
       ^ {
         connection.stop()
-      } ->: queue
+      } >>: queue
     }
   }
 
   override def onTransportFailure(error: IOException) = {
     if( !connection.stopped ) {
+      connection.transport.suspendRead
       info(error, "Shutting connection down due to: %s", error)
       super.onTransportFailure(error);
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 03:51:43 2010
@@ -44,9 +44,6 @@ import java.util.Map;
  */
 public class TcpTransport extends BaseService implements Transport {
 
-    static {
-        System.out.println(TcpTransport.class.getClassLoader().getResource("log4j.properties"));
-    }
     private static final Log LOG = LogFactory.getLog(TcpTransport.class);
 
     private Map<String, Object> socketOptions;
@@ -450,7 +447,7 @@ public class TcpTransport extends BaseSe
     }
 
     private void drainInbound() throws IOException {
-        if (getServiceState() == STARTED || readSource.isSuspended()) {
+        if (!getServiceState().isStarted() || readSource.isSuspended()) {
             return;
         }
         while (true) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java?rev=961090&r1=961089&r2=961090&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/apollo/BaseService.java Wed Jul  7 03:51:43 2010
@@ -30,10 +30,13 @@ import java.util.LinkedList;
  */
 public abstract class BaseService implements Service {
 
-    static class State {
+    public static class State {
         public String toString() {
             return getClass().getSimpleName();
         }
+        public boolean isStarted() {
+            return false;
+        }
     }
 
     static class CallbackSupport extends State {
@@ -55,7 +58,11 @@ public abstract class BaseService implem
     public static final State CREATED = new State();
     public static class STARTING extends CallbackSupport {
     }
-    public static final State STARTED = new State();
+    public static final State STARTED = new State() {
+        public boolean isStarted() {
+            return true;
+        }
+    };
     public static class STOPPING extends CallbackSupport {
     }