You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/27 21:53:24 UTC
svn commit: r979831 - in /activemq/activemq-apollo/trunk: ./
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/
apollo-web/src/main/scala/org/apache/activemq/apollo/web/ ap...
Author: chirino
Date: Tue Jul 27 19:53:24 2010
New Revision: 979831
URL: http://svn.apache.org/viewvc?rev=979831&view=rev
Log:
Merge remote branch 'local/master'
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml
activemq/activemq-apollo/trunk/pom.xml
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Tue Jul 27 19:53:24 2010
@@ -19,6 +19,7 @@ package org.apache.activemq.apollo.broke
import _root_.java.io.{File}
import _root_.java.lang.{String}
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatchHelpers._
import org.fusesource.hawtdispatch.{Dispatch}
import org.fusesource.hawtbuf._
import AsciiBuffer._
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue Jul 27 19:53:24 2010
@@ -19,16 +19,17 @@ package org.apache.activemq.apollo.broke
import java.util.concurrent.TimeUnit
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatchHelpers._
import java.util.concurrent.atomic.AtomicInteger
import collection.{SortedMap}
-import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
import org.apache.activemq.apollo.store.{StoreUOW}
import protocol.ProtocolFactory
import collection.mutable.ListBuffer
import org.apache.activemq.apollo.store._
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.util.list._
+import org.fusesource.hawtdispatch.{ListEventAggregator, DispatchQueue, BaseRetained}
object Queue extends Log {
val subcsription_counter = new AtomicInteger(0)
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Tue Jul 27 19:53:24 2010
@@ -20,6 +20,7 @@ import _root_.java.util.concurrent.atomi
import _root_.org.fusesource.hawtbuf._
import _root_.org.fusesource.hawtdispatch._
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatchHelpers._
import collection.JavaConversions
import org.apache.activemq.apollo.util._
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Tue Jul 27 19:53:24 2010
@@ -26,8 +26,10 @@ import org.apache.activemq.apollo.dto.{V
import java.util.concurrent.TimeUnit
import org.apache.activemq.apollo.store.{Store, StoreFactory}
import org.apache.activemq.apollo.util._
+import path.PathFilter
import ReporterLevel._
import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
+import collection.JavaConversions
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -213,40 +215,38 @@ class VirtualHost(val broker: Broker, va
// rates between producers and consumers, look for natural data flow partitions
// and then try to equally divide the load over the available processing
// threads/cores.
-// router.destinations.valuesIterator.foreach { node =>
- // todo
-// if( node.get_queue==null ) {
-// // Looks like a topic destination...
-//
-// // 1->1 is the easy case...
-// if( node.direct_consumers.size==1 && node.producers.size==1 ) {
-// // move the producer to the consumer thread.
-// node.producers.head.producer.collocate( node.direct_consumers.head.dispatchQueue )
-// } else {
-// // we need to get fancy perhaps look at rates
-// // to figure out how to be group the connections.
-// }
-// } else {
-// // Looks like a queue destination...
-//
-// if( node.direct_consumers.size==1 ) {
-// // move the queue to the consumer
-// node.get_queue.collocate( node.direct_consumers.head.dispatchQueue )
-// } else {
-// // we need to get fancy perhaps look at rates
-// // to figure out how to be group the connections.
-// }
-//
-// if( node.producers.size==1 ) {
-// // move the producer to the queue.
-// node.producers.head.producer.collocate( node.get_queue.dispatchQueue )
-// } else {
-// // we need to get fancy perhaps look at rates
-// // to figure out how to be group the connections.
-// }
-//
-// }
-// }
+ val nodes = router.destinations.get(PathFilter.ANY_DESCENDENT)
+
+ JavaConversions.asIterable(nodes).foreach { node =>
+
+ // For the topics, just collocate the producers onto the first consumer's
+ // thread.
+ node.broadcast_consumers.firstOption.foreach{ consumer =>
+ node.broadcast_producers.foreach { r=>
+ r.producer.collocate(consumer.dispatchQueue)
+ }
+ }
+
+ node.queues.foreach { queue=>
+
+ queue.dispatchQueue {
+
+ // Collocate the queue's with the first consumer
+ // TODO: change this so it collocates with the fastest consumer.
+
+ queue.all_subscriptions.firstOption.map( _._1 ).foreach { consumer=>
+ queue.collocate( consumer.dispatchQueue )
+ }
+
+ // Collocate all the producers with the queue..
+
+ queue.inbound_sessions.foreach { session =>
+ session.producer.collocate( queue.dispatchQueue )
+ }
+ }
+
+ }
+ }
schedualConnectionRegroup
}
dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{ if(serviceState.isStarted) { connectionRegroup } } )
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Tue Jul 27 19:53:24 2010
@@ -57,6 +57,7 @@ object StompLoadClient {
var durable = false
var destinationType = "queue"
+ var destinationName = "load"
var destinationCount = 1
val producerCounter = new AtomicLong()
@@ -146,6 +147,7 @@ object StompLoadClient {
"uri = "+uri+"\n"+
"destinationType = "+destinationType+"\n"+
"destinationCount = "+destinationCount+"\n" +
+ "destinationName = "+destinationName+"\n" +
"sampleInterval = "+sampleInterval+"\n" +
"\n"+
"--- Producer Properties ---\n"+
@@ -172,7 +174,7 @@ object StompLoadClient {
println("%s rate: %,.3f per second, total: %,d".format(name, rate_per_second, totalCount))
}
- def destination(i:Int) = "/"+destinationType+"/load-"+(i%destinationCount)
+ def destination(i:Int) = "/"+destinationType+"/"+destinationName+"-"+(i%destinationCount)
class StompClient {
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala Tue Jul 27 19:53:24 2010
@@ -22,6 +22,7 @@ import org.apache.activemq.apollo.dto.{X
import java.util.regex.Pattern
import javax.xml.stream.{XMLOutputFactory, XMLInputFactory}
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatchHelpers._
import java.util.concurrent.{TimeUnit, ExecutorService, Executors}
import org.fusesource.hawtbuf.{ByteArrayInputStream, ByteArrayOutputStream}
import javax.xml.bind.{Marshaller, JAXBContext}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml Tue Jul 27 19:53:24 2010
@@ -23,7 +23,10 @@
%p queue size: #{queue_items} messages
%p queue size: #{memory(queue_size)}
-%p memory used: #{ "%,.2f".format(capacity_used.toFloat*100.0/capacity) }% (#{memory(capacity_used)}/#{memory(capacity)})
+- if( capacity > 0 )
+ %p memory used: #{ "%,.2f".format(capacity_used.toFloat*100.0/capacity) }% (#{memory(capacity_used)}/#{memory(capacity)})
+- else
+ %p memory used: #{ "%,.2f".format(0f) }% (#{memory(capacity_used)}/#{memory(capacity)})
%h2 Enqueue/Deqeueue Counters
Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=979831&r1=979830&r2=979831&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Tue Jul 27 19:53:24 2010
@@ -99,8 +99,8 @@
<xbean-version>3.4</xbean-version>
<felix-version>1.0.0</felix-version>
- <hawtdispatch-version>1.0-SNAPSHOT</hawtdispatch-version>
- <hawtdb-version>1.2-SNAPSHOT</hawtdb-version>
+ <hawtdispatch-version>1.0</hawtdispatch-version>
+ <hawtdb-version>1.3-SNAPSHOT</hawtdb-version>
<hawtbuf-version>1.1</hawtbuf-version>
<jetty-version>6.1.22</jetty-version>