You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/27 21:53:24 UTC

svn commit: r979831 - in /activemq/activemq-apollo/trunk: ./ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/ apollo-web/src/main/scala/org/apache/activemq/apollo/web/ ap...

Author: chirino
Date: Tue Jul 27 19:53:24 2010
New Revision: 979831

URL: http://svn.apache.org/viewvc?rev=979831&view=rev
Log:
Merge remote branch 'local/master'

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Tue Jul 27 19:53:24 2010
@@ -19,6 +19,7 @@ package org.apache.activemq.apollo.broke
 import _root_.java.io.{File}
 import _root_.java.lang.{String}
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatchHelpers._
 import org.fusesource.hawtdispatch.{Dispatch}
 import org.fusesource.hawtbuf._
 import AsciiBuffer._

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue Jul 27 19:53:24 2010
@@ -19,16 +19,17 @@ package org.apache.activemq.apollo.broke
 import java.util.concurrent.TimeUnit
 
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatchHelpers._
 import java.util.concurrent.atomic.AtomicInteger
 
 import collection.{SortedMap}
-import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
 import org.apache.activemq.apollo.store.{StoreUOW}
 import protocol.ProtocolFactory
 import collection.mutable.ListBuffer
 import org.apache.activemq.apollo.store._
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.util.list._
+import org.fusesource.hawtdispatch.{ListEventAggregator, DispatchQueue, BaseRetained}
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Tue Jul 27 19:53:24 2010
@@ -20,6 +20,7 @@ import _root_.java.util.concurrent.atomi
 import _root_.org.fusesource.hawtbuf._
 import _root_.org.fusesource.hawtdispatch._
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatchHelpers._
 
 import collection.JavaConversions
 import org.apache.activemq.apollo.util._

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Tue Jul 27 19:53:24 2010
@@ -26,8 +26,10 @@ import org.apache.activemq.apollo.dto.{V
 import java.util.concurrent.TimeUnit
 import org.apache.activemq.apollo.store.{Store, StoreFactory}
 import org.apache.activemq.apollo.util._
+import path.PathFilter
 import ReporterLevel._
 import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
+import collection.JavaConversions
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -213,40 +215,38 @@ class VirtualHost(val broker: Broker, va
       // rates between producers and consumers, look for natural data flow partitions
       // and then try to equally divide the load over the available processing
       // threads/cores.
-//      router.destinations.valuesIterator.foreach { node =>
-        // todo
-//        if( node.get_queue==null ) {
-//          // Looks like a topic destination...
-//
-//          // 1->1 is the easy case...
-//          if( node.direct_consumers.size==1 && node.producers.size==1 ) {
-//            // move the producer to the consumer thread.
-//            node.producers.head.producer.collocate( node.direct_consumers.head.dispatchQueue )
-//          } else {
-//            // we need to get fancy perhaps look at rates
-//            // to figure out how to be group the connections.
-//          }
-//        } else {
-//          // Looks like a queue destination...
-//
-//          if( node.direct_consumers.size==1 ) {
-//            // move the queue to the consumer
-//            node.get_queue.collocate( node.direct_consumers.head.dispatchQueue )
-//          } else {
-//            // we need to get fancy perhaps look at rates
-//            // to figure out how to be group the connections.
-//          }
-//
-//          if( node.producers.size==1 ) {
-//            // move the producer to the queue.
-//            node.producers.head.producer.collocate( node.get_queue.dispatchQueue )
-//          } else {
-//            // we need to get fancy perhaps look at rates
-//            // to figure out how to be group the connections.
-//          }
-//
-//        }
-//      }
+      val nodes = router.destinations.get(PathFilter.ANY_DESCENDENT)
+
+      JavaConversions.asIterable(nodes).foreach { node =>
+
+        // For the topics, just collocate the producers onto the first consumer's
+        // thread.
+        node.broadcast_consumers.firstOption.foreach{ consumer =>
+          node.broadcast_producers.foreach { r=>
+            r.producer.collocate(consumer.dispatchQueue)
+          }
+        }
+
+        node.queues.foreach { queue=>
+
+          queue.dispatchQueue {
+
+            // Collocate the queue's with the first consumer
+            // TODO: change this so it collocates with the fastest consumer.
+
+            queue.all_subscriptions.firstOption.map( _._1 ).foreach { consumer=>
+              queue.collocate( consumer.dispatchQueue )
+            }
+
+            // Collocate all the producers with the queue..
+
+            queue.inbound_sessions.foreach { session =>
+              session.producer.collocate( queue.dispatchQueue )
+            }
+          }
+
+        }
+      }
       schedualConnectionRegroup
     }
     dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{ if(serviceState.isStarted) { connectionRegroup } } )

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Tue Jul 27 19:53:24 2010
@@ -57,6 +57,7 @@ object StompLoadClient {
   var durable = false
 
   var destinationType = "queue"
+  var destinationName = "load"
   var destinationCount = 1
 
   val producerCounter = new AtomicLong()
@@ -146,6 +147,7 @@ object StompLoadClient {
     "uri              = "+uri+"\n"+
     "destinationType  = "+destinationType+"\n"+
     "destinationCount = "+destinationCount+"\n" +
+    "destinationName  = "+destinationName+"\n" +
     "sampleInterval   = "+sampleInterval+"\n" +
     "\n"+
     "--- Producer Properties ---\n"+
@@ -172,7 +174,7 @@ object StompLoadClient {
     println("%s rate: %,.3f per second, total: %,d".format(name, rate_per_second, totalCount))
   }
 
-  def destination(i:Int) = "/"+destinationType+"/load-"+(i%destinationCount)
+  def destination(i:Int) = "/"+destinationType+"/"+destinationName+"-"+(i%destinationCount)
 
 
   class StompClient {

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala Tue Jul 27 19:53:24 2010
@@ -22,6 +22,7 @@ import org.apache.activemq.apollo.dto.{X
 import java.util.regex.Pattern
 import javax.xml.stream.{XMLOutputFactory, XMLInputFactory}
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatchHelpers._
 import java.util.concurrent.{TimeUnit, ExecutorService, Executors}
 import org.fusesource.hawtbuf.{ByteArrayInputStream, ByteArrayOutputStream}
 import javax.xml.bind.{Marshaller, JAXBContext}

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml Tue Jul 27 19:53:24 2010
@@ -23,7 +23,10 @@
 
 %p queue size: #{queue_items} messages
 %p queue size: #{memory(queue_size)}
-%p memory used: #{ "%,.2f".format(capacity_used.toFloat*100.0/capacity) }% (#{memory(capacity_used)}/#{memory(capacity)})
+- if( capacity > 0 )
+  %p memory used: #{ "%,.2f".format(capacity_used.toFloat*100.0/capacity) }% (#{memory(capacity_used)}/#{memory(capacity)})
+- else
+  %p memory used: #{ "%,.2f".format(0f) }% (#{memory(capacity_used)}/#{memory(capacity)})
 
 %h2 Enqueue/Deqeueue Counters
 

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Tue Jul 27 19:53:24 2010
@@ -99,8 +99,8 @@
     <xbean-version>3.4</xbean-version>
     <felix-version>1.0.0</felix-version>
 
-    <hawtdispatch-version>1.0-SNAPSHOT</hawtdispatch-version>
-    <hawtdb-version>1.2-SNAPSHOT</hawtdb-version>
+    <hawtdispatch-version>1.0</hawtdispatch-version>
+    <hawtdb-version>1.3-SNAPSHOT</hawtdb-version>
     <hawtbuf-version>1.1</hawtbuf-version>
     
     <jetty-version>6.1.22</jetty-version>