You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:09:06 UTC

svn commit: r961136 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/ activemq-stomp/src/main/scala/org/apache/activem...

Author: chirino
Date: Wed Jul  7 04:09:06 2010
New Revision: 961136

URL: http://svn.apache.org/viewvc?rev=961136&view=rev
Log:
- fix deprecation warnnings.
- fix persistent slow down if queue mem limit was large

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/IntrospectionSupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:09:06 2010
@@ -107,7 +107,7 @@ class Queue(val host: VirtualHost, val d
   /**
    * Tunning options.
    */
-  var tune_max_size = 1024 * 32
+  var tune_max_size = 1024 * 1024 * 4
   var tune_subscription_prefetch = 1024*32
   var tune_max_outbound_size = 1024 * 1204 * 5
   var tune_swap_delay = 100L
@@ -170,12 +170,33 @@ class Queue(val host: VirtualHost, val d
         if( !entry.hasSubs ) {
           // we flush the entry out right away if it looks
           // it wont be needed.
+
           if( entry.getPrevious.isFlushedOrFlushing ) {
+            // in this case take it out of memory too...
             flushingSize += entry.flush
           } else {
+            if( slow_consumers ) {
+              if( delivery.storeBatch!=null ) {
+                // just make it hit the disk quick.. but keep it in memory.
+                delivery.storeBatch.eagerFlush(^{})
+              }
+            } else {
+              if( !checking_for_slow_consumers ) {
+                checking_for_slow_consumers=true
+                val tail_consumer_counter_copy = tail_consumer_counter
+                dispatchQueue.dispatchAfter(tune_swap_delay, TimeUnit.MILLISECONDS, ^{
+                  if( tail_consumer_counter_copy == tail_consumer_counter ) {
+                    slow_consumers = true
+                  }
+                  checking_for_slow_consumers = false
+                })
+              }
+            }
             swap_check=true
           }
         } else {
+          slow_consumers = false
+          tail_consumer_counter += 1
           //  entry.dispatch==null if the entry was fully dispatched
           swap_check = entry.dispatch!=null
         }
@@ -204,6 +225,10 @@ class Queue(val host: VirtualHost, val d
     }
   }
 
+  var tail_consumer_counter = 0L
+  var checking_for_slow_consumers = false
+  var slow_consumers = false
+
   def ack(entry: QueueEntry, sb:StoreBatch) = {
     if (entry.ref != -1) {
       val storeBatch = if( sb == null ) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul  7 04:09:06 2010
@@ -195,22 +195,21 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     flush
   }
 
-  def addQueue(record: QueueRecord) = {
+  def addQueue(record: QueueRecord, callback:Runnable) = {
     val update = new AddQueue.Bean()
     update.setKey(record.key)
     update.setName(record.name)
     update.setQueueType(record.queueType)
-    store(update)
+    _store(update, callback)
   }
 
-  def removeQueue(queueKey: Long):Boolean = {
+  def removeQueue(queueKey: Long, callback:Runnable) = {
     val update = new RemoveQueue.Bean()
     update.setKey(queueKey)
-    store(update)
-    true
+    _store(update, callback)
   }
 
-  def store(txs: Seq[HawtDBStore#HawtDBBatch]) {
+  def store(txs: Seq[HawtDBStore#HawtDBBatch], callback:Runnable) {
     var batch = ListBuffer[TypeCreatable]()
     txs.foreach {
       tx =>
@@ -233,15 +232,12 @@ class HawtDBClient(hawtDBStore: HawtDBSt
             }
         }
     }
-    store(batch, ^{
-      txs.foreach { tx=>
-        tx.onPerformed
-      }
-    })
+    _store(batch, callback)
   }
 
+
   def purge(callback: Runnable) = {
-    store(new Purge.Bean(), callback)
+    _store(new Purge.Bean(), callback)
   }
 
   def listQueues: Seq[Long] = {
@@ -344,29 +340,17 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     }
   }
 
-  private def store(updates: List[TypeCreatable]): Unit = {
-    val tracker = new TaskTracker("storing")
-    store(updates, tracker.task(updates))
-    tracker.await
-  }
-
-  private def store(update: TypeCreatable): Unit = {
-    val tracker = new TaskTracker("storing")
-    store(update, tracker.task(update))
-    tracker.await
-  }
-
-  private def store(updates: Seq[TypeCreatable], onComplete: Runnable): Unit = {
+  private def _store(updates: Seq[TypeCreatable], onComplete: Runnable): Unit = {
     val batch = next_batch_id
     begin(batch)
     updates.foreach {
       update =>
-        store(batch, update, null)
+        _store(batch, update, null)
     }
     commit(batch, onComplete)
   }
 
-  private def store(update: TypeCreatable, onComplete: Runnable): Unit = store(-1, update, onComplete)
+  private def _store(update: TypeCreatable, onComplete: Runnable): Unit = _store(-1, update, onComplete)
 
   /**
    * All updated are are funneled through this method. The updates are logged to
@@ -375,7 +359,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
    *
    * @throws IOException
    */
-  private def store(batch: Int, update: TypeCreatable, onComplete: Runnable): Unit = {
+  private def _store(batch: Int, update: TypeCreatable, onComplete: Runnable): Unit = {
     val kind = update.asInstanceOf[TypeCreatable]
     val frozen = update.freeze
     val baos = new DataByteArrayOutputStream(frozen.serializedSizeUnframed + 1)
@@ -452,7 +436,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     val start = System.currentTimeMillis()
     incrementalRecover
 
-    store(new AddTrace.Bean().setMessage("RECOVERED"), ^ {
+    _store(new AddTrace.Bean().setMessage("RECOVERED"), ^ {
       // Rollback any batches that did not complete.
       batches.keysIterator.foreach {
         batch =>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:09:06 2010
@@ -21,8 +21,6 @@ import org.fusesource.hawtdispatch.BaseR
 import java.util.concurrent.atomic.AtomicLong
 import collection.mutable.ListBuffer
 import java.util.HashMap
-import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
-import org.apache.activemq.apollo.util.IntCounter
 import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
 import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, StoreDTO}
 import collection.{JavaConversions, Seq}
@@ -30,6 +28,8 @@ import org.fusesource.hawtdispatch.Scala
 import org.apache.activemq.apollo.broker._
 import java.io.File
 import ReporterLevel._
+import org.apache.activemq.apollo.util.{TimeCounter, IntCounter}
+import java.util.concurrent._
 
 object HawtDBStore extends Log {
   val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -91,7 +91,13 @@ class HawtDBStore extends Store with Bas
   }
 
   protected def _start(onCompleted: Runnable) = {
-    executor_pool = Executors.newFixedThreadPool(20)
+    executor_pool = new ThreadPoolExecutor(4, 20, 1, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable](), new ThreadFactory(){
+      def newThread(r: Runnable) = {
+        val rc = new Thread(r, "hawtdb store client")
+        rc.setDaemon(true)
+        rc
+      }
+    })
     client.config = config
     schedualDisplayStats
     executor_pool {
@@ -135,16 +141,11 @@ class HawtDBStore extends Store with Bas
   def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
     val key = next_queue_key.getAndIncrement
     record.key = key
-    executor_pool ^{
-      client.addQueue(record)
-      callback(Some(key))
-    }
+    client.addQueue(record, ^{ callback(Some(key)) })
   }
 
   def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
-    executor_pool ^{
-      callback(client.removeQueue(queueKey))
-    }
+    client.removeQueue(queueKey,^{ callback(true) })
   }
 
   def getQueueStatus(id: Long)(callback: (Option[QueueStatus]) => Unit) = {
@@ -318,8 +319,9 @@ class HawtDBStore extends Store with Bas
 
         def rate(x:Long, y:Long):Float = ((y-x)*1000.0f)/TimeUnit.NANOSECONDS.toMillis(et-st)
 
-        info("metrics: cancled: { messages: %,.3f, enqeues: %,.3f }, flushed: { messages: %,.3f, enqeues: %,.3f }, commit latency: %,.3f",
-          rate(ss._1,es._1), rate(ss._2,es._2), rate(ss._3,es._3), rate(ss._4,es._4), avgCommitLatency)
+
+        info("metrics: cancled: { messages: %,.3f, enqeues: %,.3f }, flushed: { messages: %,.3f, enqeues: %,.3f }, commit latency: %,.3f, store latency: %,.3f",
+          rate(ss._1,es._1), rate(ss._2,es._2), rate(ss._3,es._3), rate(ss._4,es._4), avgCommitLatency, storeLatency(true).avgTime(TimeUnit.MILLISECONDS) )
         schedualDisplayStats
       }
     }
@@ -390,6 +392,8 @@ class HawtDBStore extends Store with Bas
   flush_source.setEventHandler(^{drain_flushes});
   flush_source.resume
 
+  val storeLatency = new TimeCounter
+
   def drain_flushes:Unit = {
 
     if( !serviceState.isStarted ) {
@@ -420,7 +424,16 @@ class HawtDBStore extends Store with Bas
     }
 
     if( !txs.isEmpty ) {
-      client.store(txs)
+      storeLatency.start { end=>
+        client.store(txs, ^{
+          dispatchQueue {
+            end()
+            txs.foreach { tx=>
+              tx.onPerformed
+            }
+          }
+        })
+      }
     }
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala Wed Jul  7 04:09:06 2010
@@ -39,11 +39,11 @@ object StompBroker {
     connector.protocol = "stomp"
     connector.advertise = uri
 
-    val store = new CassandraStoreDTO
-    store.hosts.add("localhost:9160")
+//    val store = new CassandraStoreDTO
+//    store.hosts.add("localhost:9160")
 
-//    val store = new HawtDBStoreDTO
-//    store.directory = new File("activemq-data")
+    val store = new HawtDBStoreDTO
+    store.directory = new File("activemq-data")
     
     broker.config.virtualHosts.get(0).store = store
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala Wed Jul  7 04:09:06 2010
@@ -42,7 +42,7 @@ case class ClassFinder[T](path:String, l
           classNames = classNames ::: enum.nextElement.asInstanceOf[String] :: Nil
         }
       }
-      classNames = classNames.removeDuplicates
+      classNames = classNames.distinct
 
       classes :::= classNames.map { name=>
         loader.loadClass(name).asInstanceOf[Class[T]]
@@ -50,7 +50,7 @@ case class ClassFinder[T](path:String, l
 
     }
 
-    return classes.removeDuplicates
+    return classes.distinct
   }
 
   private def loadProperties(is:InputStream):Properties = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala Wed Jul  7 04:09:06 2010
@@ -19,14 +19,14 @@ package org.apache.activemq.apollo.util
 import java.util.concurrent.TimeUnit
 
 /**
- * <p>A Timer collects time durations and produces a TimingMetric.</p>
+ * <p>A Timer collects time durations and produces a TimeMetric.</p>
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class TimeCounter extends MetricProducer[TimeMetric] {
 
-  private var maximum = Math.MIN_LONG
-  private var minimum = Math.MAX_LONG
+  private var maximum = Long.MinValue
+  private var minimum = Long.MaxValue
   private var total = 0L
   private var count = 0
 
@@ -39,8 +39,8 @@ class TimeCounter extends MetricProducer
   }
 
   def clear() = {
-    maximum = Math.MIN_INT
-    minimum = Math.MAX_INT
+    maximum = Long.MinValue
+    minimum = Long.MaxValue
     total = 0L
     count = 0
   }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/IntrospectionSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/IntrospectionSupport.java?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/IntrospectionSupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/IntrospectionSupport.java Wed Jul  7 04:09:06 2010
@@ -254,7 +254,7 @@ public final class IntrospectionSupport 
     }
 
     public static String toString(Object target) {
-        return toString(target, Object.class, null, null);
+        return toString(target, Object.class, null, (String[])null);
     }
     
     public static String toString(Object target, String...fields) {
@@ -262,7 +262,7 @@ public final class IntrospectionSupport 
     }
     
     public static String toString(Object target, Class<?> stopClass) {
-    	return toString(target, stopClass, null, null);
+    	return toString(target, stopClass, null, (String[])null);
     }
 
     public static String toString(Object target, Map<String, Object> overrideFields, String...fields) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java Wed Jul  7 04:09:06 2010
@@ -115,16 +115,16 @@ public abstract class CombinationTestSup
 
     private CombinationTestSupport[] getCombinations() {
         try {
-            Method method = getClass().getMethod("initCombos", null);
-            method.invoke(this, null);
+            Method method = getClass().getMethod("initCombos", (Class[])null);
+            method.invoke(this, (Object[])null);
         } catch (Throwable e) {
         }
 
         String name = getName().split(" ")[0];
         String comboSetupMethodName = "initCombosFor" + Character.toUpperCase(name.charAt(0)) + name.substring(1);
         try {
-            Method method = getClass().getMethod(comboSetupMethodName, null);
-            method.invoke(this, null);
+            Method method = getClass().getMethod(comboSetupMethodName, (Class[])null);
+            method.invoke(this, (Object[])null);
         } catch (Throwable e) {
         }