You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/07/18 04:46:09 UTC

svn commit: r1147724 - in /activemq/activemq-apollo/trunk: ./ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-util/src/main/scala/org/apache/activemq/apollo/util/

Author: chirino
Date: Mon Jul 18 02:46:06 2011
New Revision: 1147724

URL: http://svn.apache.org/viewvc?rev=1147724&view=rev
Log:
Switch to hawtdispatch 1.4 for it's support of ordered dispatch sources.
Improve the LoggingTracker so it's less noisy.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LoggingTracker.scala
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1147724&r1=1147723&r2=1147724&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Mon Jul 18 02:46:06 2011
@@ -141,6 +141,7 @@ object BrokerRegistry extends Log {
 object Broker extends Log {
 
   val BLOCKABLE_THREAD_POOL = ApolloThreadPool.INSTANCE
+  private val SERVICE_TIMEOUT = 1000*5;
 
   def class_loader:ClassLoader = ClassFinder.class_loader
 
@@ -269,7 +270,7 @@ class Broker() extends BaseService {
     dispatch_queue.assertExecuting()
     this.config = config
 
-    val tracker = new LoggingTracker("broker reconfiguration", console_log, dispatch_queue)
+    val tracker = new LoggingTracker("broker reconfiguration", console_log, SERVICE_TIMEOUT)
     if( service_state.is_started ) {
       apply_update(tracker)
     }
@@ -285,14 +286,14 @@ class Broker() extends BaseService {
     init_dispatch_queue(dispatch_queue)
     BrokerRegistry.add(this)
 
-    val tracker = new LoggingTracker("broker startup", console_log, dispatch_queue)
+    val tracker = new LoggingTracker("broker startup", console_log, SERVICE_TIMEOUT)
     apply_update(tracker)
     tracker.callback(on_completed)
 
   }
 
   def _stop(on_completed:Runnable): Unit = {
-    val tracker = new LoggingTracker("broker shutdown", console_log, dispatch_queue)
+    val tracker = new LoggingTracker("broker shutdown", console_log, SERVICE_TIMEOUT)
 
     // Stop the services...
     services.foreach( x=>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1147724&r1=1147723&r2=1147724&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Mon Jul 18 02:46:06 2011
@@ -758,7 +758,7 @@ class LocalRouter(val virtual_host:Virtu
   }
 
   protected def _start(on_completed: Runnable) = {
-    val tracker = new LoggingTracker("router startup", virtual_host.console_log, dispatch_queue)
+    val tracker = new LoggingTracker("router startup", virtual_host.console_log)
     if( virtual_host.store!=null ) {
       val task = tracker.task("list_queues")
       virtual_host.store.list_queues { queue_keys =>
@@ -1074,7 +1074,7 @@ class LocalRouter(val virtual_host:Virtu
   }
 
   def apply_update(on_completed:Runnable) = {
-    val tracker = new LoggingTracker("domain update", virtual_host.broker.console_log, dispatch_queue)
+    val tracker = new LoggingTracker("domain update", virtual_host.broker.console_log)
     topic_domain.apply_update(tracker)
     queue_domain.apply_update(tracker)
     // we may need to create some more destinations.

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1147724&r1=1147723&r2=1147724&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Mon Jul 18 02:46:06 2011
@@ -123,7 +123,7 @@ class VirtualHost(val broker: Broker, va
     router = RouterFactory.create(this)
     store = StoreFactory.create(config.store)
 
-    val tracker = new LoggingTracker("virtual host startup", console_log, dispatch_queue)
+    val tracker = new LoggingTracker("virtual host startup", console_log)
     if( store!=null ) {
       val task = tracker.task("store startup")
       console_log.info("Starting store: "+store)
@@ -152,7 +152,7 @@ class VirtualHost(val broker: Broker, va
     }
 
     tracker.callback {
-      val tracker = new LoggingTracker("virtual host startup", console_log, dispatch_queue)
+      val tracker = new LoggingTracker("virtual host startup", console_log)
       tracker.start(router)
       tracker.callback(on_completed)
     }
@@ -162,7 +162,7 @@ class VirtualHost(val broker: Broker, va
 
   override protected def _stop(on_completed:Runnable):Unit = {
 
-    val tracker = new LoggingTracker("virtual host shutdown", console_log, dispatch_queue)
+    val tracker = new LoggingTracker("virtual host shutdown", console_log)
     tracker.stop(router);
     if( store!=null ) {
       tracker.stop(store);

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LoggingTracker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LoggingTracker.scala?rev=1147724&r1=1147723&r2=1147724&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LoggingTracker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LoggingTracker.scala Mon Jul 18 02:46:06 2011
@@ -27,17 +27,39 @@ import org.fusesource.hawtdispatch.{Task
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class LoggingTracker(name:String, val log:Log=Log(classOf[LoggingTracker]), parent:DispatchQueue=globalQueue) extends TaskTracker(name, parent) {
+class LoggingTracker(name:String, val log:Log=Log(classOf[LoggingTracker]), timeout:Long=1000) extends TaskTracker(name, timeout) {
   assert(log!=null)
   import log._
 
-  timeout = 1000;
+  var status:Option[(Long, List[String])] = None
 
-  override protected def onTimeout(duration:Long, tasks: List[String]):Long = {
-    info("%s is taking a long time (%d seconds). Waiting on %s", name, (duration/1000), tasks.mkString(", "))
+  override protected def onTimeout(startedAt:Long, tasks: List[String]):Long = {
+    status match {
+      case None =>
+        info("%s is waiting on %s", name, tasks.mkString(", "))
+        status = Some((startedAt, tasks))
+      case Some(data)=>
+        if( data._2 != tasks ) {
+          info("%s is now waiting on %s", name, tasks.mkString(", "))
+          status = Some((startedAt, tasks))
+        }
+    }
     timeout
   }
 
+
+  override def callback(handler: Runnable) {
+    super.callback(^{
+      status match {
+        case None =>
+        case Some(data)=>
+          info("%s is no longer waiting.  It waited a total of %d seconds.", ((System.currentTimeMillis()-data._1)/1000))
+          status = None
+      }
+      handler.run();
+    })
+  }
+
   def start(service:Service) = {
     service.start(task("start "+service))
   }

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1147724&r1=1147723&r2=1147724&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Mon Jul 18 02:46:06 2011
@@ -96,8 +96,8 @@
     <xbean-version>3.4</xbean-version>
     <felix-version>1.0.0</felix-version>
 
-    <hawtdispatch-version>1.3</hawtdispatch-version>
     <hawtbuf-version>1.5</hawtbuf-version>
+    <hawtdispatch-version>1.4-SNAPSHOT</hawtdispatch-version>
     
     <jdbm-version>2.0.1</jdbm-version>
     <bdb-version>4.1.10</bdb-version>